From bc7b3ebd669b018f5e91213ef7bf9e5ad26911a8 Mon Sep 17 00:00:00 2001 From: Kevin Leyow Date: Tue, 3 Sep 2024 07:48:47 -0500 Subject: [PATCH] feat(csi/551): add transfer state change for proxied fxTransfer (#1087) * feat(csi/551): add transfer state change for proxied fxTransfer * remove * add case * dep * unit tests * int tests * chore(snapshot): 17.8.0-snapshot.16 --- package-lock.json | 90 +---- package.json | 6 +- src/domain/fx/index.js | 18 + src/handlers/transfers/FxFulfilService.js | 5 +- src/handlers/transfers/dto.js | 6 +- src/handlers/transfers/handler.js | 6 +- src/handlers/transfers/prepare.js | 146 +++++--- src/models/fxTransfer/fxTransfer.js | 19 +- .../handlers/transfers/handlers.test.js | 337 +++++++++++++++++- test/unit/domain/fx/index.test.js | 50 ++- test/unit/handlers/transfers/prepare.test.js | 78 +++- 11 files changed, 622 insertions(+), 139 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3446155cf..ea8c7e208 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.15", + "version": "17.8.0-snapshot.16", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.15", + "version": "17.8.0-snapshot.16", "license": "Apache-2.0", "dependencies": { "@hapi/basic": "7.0.2", @@ -20,7 +20,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.7.2", + "@mojaloop/central-services-shared": "18.7.3", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -58,7 +58,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.1.0", + "npm-check-updates": "17.1.1", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", @@ -1623,14 +1623,14 @@ } }, "node_modules/@mojaloop/central-services-shared": { - "version": "18.7.2", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.7.2.tgz", - "integrity": "sha512-LuvLkww6scSIYdz+cyo8tghpRgJavcOkCs/9sX4F9s6dunfHgnzzWO4dO45K26PBaQZuuax/KtDHmyOH/nrPfg==", + "version": "18.7.3", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.7.3.tgz", + "integrity": "sha512-v8zl5Y+YDVWL1LNIELu1J0DO3iKQpeoKNc00yC7KmcyoRNn+wTfQZLzlXxxmeyyAJyQ7Hgyouq502a2sBxkSrg==", "dependencies": { "@hapi/catbox": "12.1.1", "@hapi/catbox-memory": "5.0.1", - "@mojaloop/inter-scheme-proxy-cache-lib": "2.2.0", - "axios": "1.7.4", + "@mojaloop/inter-scheme-proxy-cache-lib": "2.3.0", + "axios": "1.7.5", "clone": "2.1.2", "dotenv": "16.4.5", "env-var": "7.5.0", @@ -1705,32 +1705,6 @@ "integrity": "sha512-jKtjLLDiH95b002sJVc5c74PE6KKYftuyVdVmsuYId5stTaWcRFqE+5ukZI4gDUKjGn8wv2C3zPn3/nyjEI7gg==", "deprecated": "This version has been deprecated and is no longer supported or maintained" }, - "node_modules/@mojaloop/central-services-shared/node_modules/@mojaloop/inter-scheme-proxy-cache-lib": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/@mojaloop/inter-scheme-proxy-cache-lib/-/inter-scheme-proxy-cache-lib-2.2.0.tgz", - "integrity": "sha512-QrbJlhy7f7Tf1DTjspxqtw0oN3eUAm5zKfCm7moQIYFEV3MYF3rsbODLpgxyzmAO8FFi2Dky/ff7QMVnlA/P9A==", - "dependencies": { - "@mojaloop/central-services-logger": "11.5.0", - "ajv": "^8.17.1", - "convict": "^6.2.4", - "fast-safe-stringify": "^2.1.1", - "ioredis": "^5.4.1" - }, - "engines": { - "node": ">=18.x" - } - }, - "node_modules/@mojaloop/central-services-shared/node_modules/@mojaloop/inter-scheme-proxy-cache-lib/node_modules/@mojaloop/central-services-logger": { - "version": "11.5.0", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-logger/-/central-services-logger-11.5.0.tgz", - "integrity": "sha512-pH73RiJ5fKTBTSdLocp1vPBad1D+Kh0HufdcfjLaBQj3dIBq72si0k+Z3L1MeOmMqMzpj+8M/he/izlgqJjVJA==", - "dependencies": { - "parse-strings-in-object": "2.0.0", - "rc": "1.2.8", - "safe-stable-stringify": "^2.4.3", - "winston": "3.13.1" - } - }, "node_modules/@mojaloop/central-services-shared/node_modules/raw-body": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/raw-body/-/raw-body-3.0.0.tgz", @@ -1745,40 +1719,6 @@ "node": ">= 0.8" } }, - "node_modules/@mojaloop/central-services-shared/node_modules/readable-stream": { - "version": "3.6.2", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", - "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, - "engines": { - "node": ">= 6" - } - }, - "node_modules/@mojaloop/central-services-shared/node_modules/winston": { - "version": "3.13.1", - "resolved": "https://registry.npmjs.org/winston/-/winston-3.13.1.tgz", - "integrity": "sha512-SvZit7VFNvXRzbqGHsv5KSmgbEYR5EiQfDAL9gxYkRqa934Hnk++zze0wANKtMHcy/gI4W/3xmSDwlhf865WGw==", - "dependencies": { - "@colors/colors": "^1.6.0", - "@dabh/diagnostics": "^2.0.2", - "async": "^3.2.3", - "is-stream": "^2.0.0", - "logform": "^2.6.0", - "one-time": "^1.0.0", - "readable-stream": "^3.4.0", - "safe-stable-stringify": "^2.3.1", - "stack-trace": "0.0.x", - "triple-beam": "^1.3.0", - "winston-transport": "^4.7.0" - }, - "engines": { - "node": ">= 12.0.0" - } - }, "node_modules/@mojaloop/central-services-stream": { "version": "11.3.1", "resolved": "https://registry.npmjs.org/@mojaloop/central-services-stream/-/central-services-stream-11.3.1.tgz", @@ -2669,9 +2609,9 @@ } }, "node_modules/axios": { - "version": "1.7.4", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz", - "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==", + "version": "1.7.5", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz", + "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==", "dependencies": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", @@ -9682,9 +9622,9 @@ } }, "node_modules/npm-check-updates": { - "version": "17.1.0", - "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.1.0.tgz", - "integrity": "sha512-RcohCA/tdpxyPllBlYDkqGXFJQgTuEt0f2oPSL9s05pZ3hxYdleaUtvEcSxKl0XAg3ncBhVgLAxhXSjoryUU5Q==", + "version": "17.1.1", + "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.1.1.tgz", + "integrity": "sha512-2aqIzGAEWB7xPf0hKHTkNmUM5jHbn2S5r2/z/7dA5Ij2h/sVYAg9R/uVkaUC3VORPAfBm7pKkCWo6E9clEVQ9A==", "dev": true, "bin": { "ncu": "build/cli.js", diff --git a/package.json b/package.json index 69402c2d6..7b4f496ec 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.15", + "version": "17.8.0-snapshot.16", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -92,7 +92,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.5.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.7.2", + "@mojaloop/central-services-shared": "18.7.3", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.6", "@mojaloop/event-sdk": "14.1.1", @@ -133,7 +133,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.1.0", + "npm-check-updates": "17.1.1", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", diff --git a/src/domain/fx/index.js b/src/domain/fx/index.js index cef173e06..527d68367 100644 --- a/src/domain/fx/index.js +++ b/src/domain/fx/index.js @@ -54,6 +54,22 @@ const handleFulfilResponse = async (transferId, payload, action, fspiopError) => } } +const forwardedFxPrepare = async (commitRequestId) => { + const histTimerTransferServicePrepareEnd = Metrics.getHistogram( + 'fx_domain_transfer', + 'prepare - Metrics for fx transfer domain', + ['success', 'funcName'] + ).startTimer() + try { + const result = await FxTransferModel.fxTransfer.updateFxPrepareReservedForwarded(commitRequestId) + histTimerTransferServicePrepareEnd({ success: true, funcName: 'forwardedFxPrepare' }) + return result + } catch (err) { + histTimerTransferServicePrepareEnd({ success: false, funcName: 'forwardedFxPrepare' }) + throw ErrorHandler.Factory.reformatFSPIOPError(err) + } +} + // TODO: Need to implement this for fxTransferError // /** // * @function LogFxTransferError @@ -82,6 +98,8 @@ const handleFulfilResponse = async (transferId, payload, action, fspiopError) => const TransferService = { handleFulfilResponse, + forwardedFxPrepare, + getByIdLight: FxTransferModel.fxTransfer.getByIdLight, // logFxTransferError, Cyril } diff --git a/src/handlers/transfers/FxFulfilService.js b/src/handlers/transfers/FxFulfilService.js index fb25c750f..0ca0eea0e 100644 --- a/src/handlers/transfers/FxFulfilService.js +++ b/src/handlers/transfers/FxFulfilService.js @@ -31,7 +31,7 @@ const ErrorHandler = require('@mojaloop/central-services-error-handling') const { Type, Action } = Enum.Events.Event const { SOURCE, DESTINATION } = Enum.Http.Headers.FSPIOP -const { TransferState } = Enum.Transfers +const { TransferState, TransferInternalState } = Enum.Transfers const consumerCommit = true const fromSwitch = true @@ -255,7 +255,8 @@ class FxFulfilService { } async validateTransferState(transfer, functionality) { - if (transfer.transferState !== TransferState.RESERVED) { + if (transfer.transferState !== TransferInternalState.RESERVED && + transfer.transferState !== TransferInternalState.RESERVED_FORWARDED) { const fspiopError = fspiopErrorFactory.fxTransferNonReservedState() const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) const eventDetail = { diff --git a/src/handlers/transfers/dto.js b/src/handlers/transfers/dto.js index 2ee5433bf..6d4b5859f 100644 --- a/src/handlers/transfers/dto.js +++ b/src/handlers/transfers/dto.js @@ -16,11 +16,11 @@ const prepareInputDto = (error, messages) => { if (!message) throw new Error('No input kafka message') const payload = decodePayload(message.value.content.payload) - const isForwarded = message.value.metadata.event.action === Action.FORWARDED - const isFx = !payload.transferId && !isForwarded + const isForwarded = message.value.metadata.event.action === Action.FORWARDED || message.value.metadata.event.action === Action.FX_FORWARDED + const isFx = !payload.transferId const { action } = message.value.metadata.event - const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED].includes(action) + const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED, Action.FX_FORWARDED].includes(action) const actionLetter = isPrepare ? Enum.Events.ActionLetter.prepare diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js index 98c5da638..bbf1e5686 100644 --- a/src/handlers/transfers/handler.js +++ b/src/handlers/transfers/handler.js @@ -108,7 +108,8 @@ const fulfil = async (error, messages) => { TransferEventAction.FX_COMMIT, TransferEventAction.FX_RESERVE, TransferEventAction.FX_REJECT, - TransferEventAction.FX_ABORT + TransferEventAction.FX_ABORT, + TransferEventAction.FX_FORWARDED ] if (fxActions.includes(action)) { @@ -677,7 +678,8 @@ const processFxFulfilMessage = async (message, functionality, span) => { TransferEventAction.FX_RESERVE, TransferEventAction.FX_COMMIT, // TransferEventAction.FX_REJECT, - TransferEventAction.FX_ABORT + TransferEventAction.FX_ABORT, + TransferEventAction.FX_FORWARDED ] if (!validActions.includes(action)) { const errorMessage = ERROR_MESSAGES.fxActionIsNotAllowed(action) diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js index 77a4c7852..ff4c3a610 100644 --- a/src/handlers/transfers/prepare.js +++ b/src/handlers/transfers/prepare.js @@ -38,6 +38,7 @@ const Validator = require('./validator') const dto = require('./dto') const TransferService = require('#src/domain/transfer/index') const ProxyCache = require('#src/lib/proxyCache') +const FxTransferService = require('#src/domain/fx/index') const { Kafka, Comparators } = Util const { TransferState } = Enum.Transfers @@ -102,7 +103,7 @@ const processDuplication = async ({ .getByIdLight(ID) const isFinalized = [TransferState.COMMITTED, TransferState.ABORTED].includes(transfer?.transferStateEnumeration) - const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED].includes(action) + const isPrepare = [Action.PREPARE, Action.FX_PREPARE, Action.FORWARDED, Action.FX_FORWARDED].includes(action) if (isFinalized && isPrepare) { logger.info(Util.breadcrumb(location, `finalized callback--${actionLetter}1`)) @@ -296,55 +297,108 @@ const prepare = async (error, messages) => { } if (proxyEnabled && isForwarded) { - const transfer = await TransferService.getById(ID) - if (!transfer) { - const eventDetail = { - functionality: Enum.Events.Event.Type.NOTIFICATION, - action: Enum.Events.Event.Action.FORWARDED - } - const fspiopError = ErrorHandler.Factory.createFSPIOPError( - ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND, - 'Forwarded transfer could not be found.' - ).toApiErrorObject(Config.ERROR_HANDLING) - // IMPORTANT: This singular message is taken by the ml-api-adapter and used to - // notify the payerFsp and proxy of the error. - // As long as the `to` and `from` message values are the payer and payee, - // and the action is `forwarded`, the ml-api-adapter will notify both. - await Kafka.proceed( - Config.KAFKA_CONFIG, - params, - { - consumerCommit, - fspiopError, - eventDetail + if (isFx) { + const fxTransfer = await FxTransferService.getByIdLight(ID) + if (!fxTransfer) { + const eventDetail = { + functionality: Enum.Events.Event.Type.NOTIFICATION, + action: Enum.Events.Event.Action.FX_FORWARDED } - ) - return true - } - - if (transfer.transferState === Enum.Transfers.TransferInternalState.RESERVED) { - await TransferService.forwardedPrepare(ID) + const fspiopError = ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND, + 'Forwarded fxTransfer could not be found.' + ).toApiErrorObject(Config.ERROR_HANDLING) + // IMPORTANT: This singular message is taken by the ml-api-adapter and used to + // notify the payerFsp and proxy of the error. + // As long as the `to` and `from` message values are the fsp and fxp, + // and the action is `fx-forwarded`, the ml-api-adapter will notify both. + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError, + eventDetail + } + ) + return true + } else { + if (fxTransfer.fxTransferState === Enum.Transfers.TransferInternalState.RESERVED) { + await FxTransferService.forwardedFxPrepare(ID) + } else { + const eventDetail = { + functionality: Enum.Events.Event.Type.NOTIFICATION, + action: Enum.Events.Event.Action.FX_FORWARDED + } + const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError( + `Invalid State: ${fxTransfer.fxTransferState} - expected: ${Enum.Transfers.TransferInternalState.RESERVED}` + ).toApiErrorObject(Config.ERROR_HANDLING) + // IMPORTANT: This singular message is taken by the ml-api-adapter and used to + // notify the payerFsp and proxy of the error. + // As long as the `to` and `from` message values are the fsp and fxp, + // and the action is `fx-forwarded`, the ml-api-adapter will notify both. + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError, + eventDetail + } + ) + } + } } else { - const eventDetail = { - functionality: Enum.Events.Event.Type.NOTIFICATION, - action: Enum.Events.Event.Action.FORWARDED + const transfer = await TransferService.getById(ID) + if (!transfer) { + const eventDetail = { + functionality: Enum.Events.Event.Type.NOTIFICATION, + action: Enum.Events.Event.Action.FORWARDED + } + const fspiopError = ErrorHandler.Factory.createFSPIOPError( + ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND, + 'Forwarded transfer could not be found.' + ).toApiErrorObject(Config.ERROR_HANDLING) + // IMPORTANT: This singular message is taken by the ml-api-adapter and used to + // notify the payerFsp and proxy of the error. + // As long as the `to` and `from` message values are the payer and payee, + // and the action is `forwarded`, the ml-api-adapter will notify both. + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError, + eventDetail + } + ) + return true } - const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError( - `Invalid State: ${transfer.transferState} - expected: ${Enum.Transfers.TransferInternalState.RESERVED}` - ).toApiErrorObject(Config.ERROR_HANDLING) - // IMPORTANT: This singular message is taken by the ml-api-adapter and used to - // notify the payerFsp and proxy of the error. - // As long as the `to` and `from` message values are the payer and payee, - // and the action is `forwarded`, the ml-api-adapter will notify both. - await Kafka.proceed( - Config.KAFKA_CONFIG, - params, - { - consumerCommit, - fspiopError, - eventDetail + + if (transfer.transferState === Enum.Transfers.TransferInternalState.RESERVED) { + await TransferService.forwardedPrepare(ID) + } else { + const eventDetail = { + functionality: Enum.Events.Event.Type.NOTIFICATION, + action: Enum.Events.Event.Action.FORWARDED } - ) + const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError( + `Invalid State: ${transfer.transferState} - expected: ${Enum.Transfers.TransferInternalState.RESERVED}` + ).toApiErrorObject(Config.ERROR_HANDLING) + // IMPORTANT: This singular message is taken by the ml-api-adapter and used to + // notify the payerFsp and proxy of the error. + // As long as the `to` and `from` message values are the payer and payee, + // and the action is `forwarded`, the ml-api-adapter will notify both. + await Kafka.proceed( + Config.KAFKA_CONFIG, + params, + { + consumerCommit, + fspiopError, + eventDetail + } + ) + } } return true } diff --git a/src/models/fxTransfer/fxTransfer.js b/src/models/fxTransfer/fxTransfer.js index b5b1fa01e..0e542f1c1 100644 --- a/src/models/fxTransfer/fxTransfer.js +++ b/src/models/fxTransfer/fxTransfer.js @@ -372,6 +372,7 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro switch (action) { case TransferEventAction.FX_COMMIT: case TransferEventAction.FX_RESERVE: + case TransferEventAction.FX_FORWARDED: state = TransferInternalState.RECEIVED_FULFIL_DEPENDENT // extensionList = payload && payload.extensionList isFulfilment = true @@ -505,6 +506,21 @@ const saveFxFulfilResponse = async (commitRequestId, payload, action, fspiopErro } } +const updateFxPrepareReservedForwarded = async function (commitRequestId) { + try { + const knex = await Db.getKnex() + return await knex('fxTransferStateChange') + .insert({ + commitRequestId, + transferStateId: TransferInternalState.RESERVED_FORWARDED, + reason: null, + createdDate: Time.getUTCString(new Date()) + }) + } catch (err) { + throw ErrorHandler.Factory.reformatFSPIOPError(err) + } +} + module.exports = { getByCommitRequestId, getByDeterminingTransferId, @@ -513,5 +529,6 @@ module.exports = { savePreparedRequest, saveFxFulfilResponse, saveFxTransfer, - getAllDetailsByCommitRequestIdForProxiedFxTransfer + getAllDetailsByCommitRequestIdForProxiedFxTransfer, + updateFxPrepareReservedForwarded } diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index cda70252d..561e6d1f2 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -52,6 +52,7 @@ const ParticipantCurrencyCached = require('#src/models/participant/participantCu const ParticipantLimitCached = require('#src/models/participant/participantLimitCached') const SettlementModelCached = require('#src/models/settlement/settlementModelCached') const TransferService = require('#src/domain/transfer/index') +const FxTransferService = require('#src/domain/fx/index') const Handlers = { index: require('#src/handlers/register'), @@ -360,7 +361,8 @@ const prepareTestData = async (dataObj) => { type: 'application/json', content: { payload: { - proxyId: 'test' + proxyId: 'test', + transferId: transferPayload.transferId } }, metadata: { @@ -377,6 +379,31 @@ const prepareTestData = async (dataObj) => { } } + const messageProtocolPrepareFxForwarded = { + id: fxTransferPayload.commitRequestId, + from: 'payerFsp', + to: 'proxyFsp', + type: 'application/json', + content: { + payload: { + proxyId: 'test', + commitRequestId: fxTransferPayload.commitRequestId + } + }, + metadata: { + event: { + id: transferPayload.transferId, + type: TransferEventType.PREPARE, + action: TransferEventAction.FX_FORWARDED, + createdAt: dataObj.now, + state: { + status: 'success', + code: 0 + } + } + } + } + const messageProtocolFxPrepare = Util.clone(messageProtocolPrepare) messageProtocolFxPrepare.id = randomUUID() messageProtocolFxPrepare.from = fxTransferPayload.initiatingFsp @@ -418,10 +445,16 @@ const prepareTestData = async (dataObj) => { const messageProtocolError = Util.clone(messageProtocolFulfil) messageProtocolError.id = randomUUID() - messageProtocolFulfil.content.uriParams = { id: transferPayload.transferId } + messageProtocolError.content.uriParams = { id: transferPayload.transferId } messageProtocolError.content.payload = errorPayload messageProtocolError.metadata.event.action = TransferEventAction.ABORT + const messageProtocolFxError = Util.clone(messageProtocolFxFulfil) + messageProtocolFxError.id = randomUUID() + messageProtocolFxError.content.uriParams = { id: fxTransferPayload.commitRequestId } + messageProtocolFxError.content.payload = errorPayload + messageProtocolFxError.metadata.event.action = TransferEventAction.FX_ABORT + const topicConfTransferPrepare = Utility.createGeneralTopicConf(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, TransferEventType.TRANSFER, TransferEventType.PREPARE) const topicConfTransferFulfil = Utility.createGeneralTopicConf(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, TransferEventType.TRANSFER, TransferEventType.FULFIL) @@ -434,7 +467,9 @@ const prepareTestData = async (dataObj) => { errorPayload, messageProtocolPrepare, messageProtocolPrepareForwarded, + messageProtocolPrepareFxForwarded, messageProtocolFxPrepare, + messageProtocolFxError, messageProtocolFulfil, messageProtocolFxFulfil, messageProtocolReject, @@ -591,7 +626,10 @@ Test('Handlers test', async handlersTest => { }) await transferForwarded.test('not timeout transfer in RESERVED_FORWARDED internal transfer state', async (test) => { - const td = await prepareTestData(testData) + const expiringTestData = Util.clone(testData) + expiringTestData.expiration = new Date((new Date()).getTime() + 5000) + + const td = await prepareTestData(expiringTestData) const prepareConfig = Utility.getKafkaConfig( Config.KAFKA_CONFIG, Enum.Kafka.Config.PRODUCER, @@ -843,6 +881,299 @@ Test('Handlers test', async handlersTest => { transferForwarded.end() }) + await handlersTest.test('transferFxForwarded should', async transferFxForwarded => { + await transferFxForwarded.test('should update fxTransfer internal state on prepare event fx-forwarded action', async (test) => { + const td = await prepareTestData(testData) + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + keyFilter: td.payer.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position fx-prepare message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + + await new Promise(resolve => setTimeout(resolve, 5000)) + + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.RESERVED_FORWARDED, 'FxTransfer state updated to RESERVED_FORWARDED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + testConsumer.clearEvents() + test.end() + }) + + await transferFxForwarded.test('not timeout fxTransfer in RESERVED_FORWARDED internal transfer state', async (test) => { + const expiringTestData = Util.clone(testData) + expiringTestData.expiration = new Date((new Date()).getTime() + 5000) + + const td = await prepareTestData(expiringTestData) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + keyFilter: td.payer.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position fx-prepare message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + await new Promise(resolve => setTimeout(resolve, 5000)) + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.RESERVED_FORWARDED, 'FxTransfer state updated to RESERVED_FORWARDED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + await new Promise(resolve => setTimeout(resolve, 5000)) + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.RESERVED_FORWARDED, 'FxTransfer still in RESERVED_FORWARDED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + testConsumer.clearEvents() + test.end() + }) + + await transferFxForwarded.test('should be able to transition from RESERVED_FORWARDED to COMMITED on fx-fulfil', async (test) => { + const td = await prepareTestData(testData) + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + keyFilter: td.payer.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position fx-prepare message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + await new Promise(resolve => setTimeout(resolve, 5000)) + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.RESERVED_FORWARDED, 'FxTransfer state updated to RESERVED_FORWARDED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + // Fulfil the fxTransfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + await Producer.produceMessage(td.messageProtocolFxFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFxFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-reserve', + valueToFilter: td.payer.name + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFxFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.COMMITTED, 'FxTransfer state updated to COMMITTED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferFxForwarded.test('should be able to transition from RESERVED_FORWARDED to RECEIVED_ERROR and ABORTED_ERROR on fx-fulfil error', async (test) => { + const td = await prepareTestData(testData) + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + keyFilter: td.payer.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position fx-prepare message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + await new Promise(resolve => setTimeout(resolve, 5000)) + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.RESERVED_FORWARDED, 'FxTransfer state updated to RESERVED_FORWARDED') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + // Fulfil the fxTransfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + console.log('messageProtocolFxError', td.messageProtocolFxError) + await Producer.produceMessage(td.messageProtocolFxError, td.topicConfTransferFulfil, fulfilConfig) + await new Promise(resolve => setTimeout(resolve, 5000)) + try { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + test.equal(fxTransfer?.fxTransferState, TransferInternalState.ABORTED_ERROR, 'FxTransfer state updated to ABORTED_ERROR') + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferFxForwarded.test('should create notification message if fxTransfer is not found', async (test) => { + const td = await prepareTestData(testData) + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + + try { + const notificationMessages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-forwarded' + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(notificationMessages[0], 'notification message found') + test.equal(notificationMessages[0].value.to, 'proxyFsp') + test.equal(notificationMessages[0].value.from, 'payerFsp') + test.equal( + notificationMessages[0].value.content.payload.errorInformation.errorDescription, + 'Generic ID not found - Forwarded fxTransfer could not be found.' + ) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferFxForwarded.test('should create notification message if transfer is found in incorrect state', async (test) => { + const expiredTestData = Util.clone(testData) + expiredTestData.expiration = new Date((new Date()).getTime() + 3000) + + const td = await prepareTestData(expiredTestData) + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + await new Promise(resolve => setTimeout(resolve, 3000)) + + try { + await wrapWithRetries(async () => { + const fxTransfer = await FxTransferService.getByIdLight(td.messageProtocolFxPrepare.content.payload.commitRequestId) || {} + if (fxTransfer?.fxTransferState !== TransferInternalState.EXPIRED_RESERVED) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return fxTransfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + // Send the prepare forwarded message after the prepare message has timed out + await Producer.produceMessage(td.messageProtocolPrepareFxForwarded, td.topicConfTransferPrepare, prepareConfig) + + try { + const notificationMessages = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-notification-event', + action: 'fx-forwarded' + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(notificationMessages[0], 'notification message found') + test.equal(notificationMessages[0].value.to, 'proxyFsp') + test.equal(notificationMessages[0].value.from, 'payerFsp') + test.equal( + notificationMessages[0].value.content.payload.errorInformation.errorDescription, + 'Internal server error - Invalid State: EXPIRED_RESERVED - expected: RESERVED' + ) + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + transferFxForwarded.end() + }) + await handlersTest.test('transferFulfil should', async transferFulfil => { await transferFulfil.test('should create position fulfil message to override topic name in config', async (test) => { const td = await prepareTestData(testData) diff --git a/test/unit/domain/fx/index.test.js b/test/unit/domain/fx/index.test.js index 78c2f8cb4..0a2300e9d 100644 --- a/test/unit/domain/fx/index.test.js +++ b/test/unit/domain/fx/index.test.js @@ -12,6 +12,7 @@ const TransferEventAction = Enum.Events.Event.Action Test('Fx', fxIndexTest => { let sandbox let payload + let fxPayload fxIndexTest.beforeEach(t => { sandbox = Sinon.createSandbox() sandbox.stub(Logger, 'isDebugEnabled').value(true) @@ -40,7 +41,22 @@ Test('Fx', fxIndexTest => { ] } } - + fxPayload = { + commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', + determiningTransferId: 'c05c3f31-33b5-4e33-8bfd-7c3a2685fb6c', + condition: 'YlK5TZyhflbXaDRPtR5zhCu8FrbgvrQwwmzuH0iQ0AI', + expiration: new Date((new Date()).getTime() + (24 * 60 * 60 * 1000)), // tomorrow + initiatingFsp: 'dfsp1', + counterPartyFsp: 'fx_dfsp', + sourceAmount: { + currency: 'USD', + amount: '433.88' + }, + targetAmount: { + currency: 'EUR', + amount: '200.00' + } + } t.end() }) @@ -80,5 +96,37 @@ Test('Fx', fxIndexTest => { handleFulfilResponseTest.end() }) + + fxIndexTest.test('forwardedPrepare should', forwardedPrepareTest => { + forwardedPrepareTest.test('commit transfer', async (test) => { + try { + fxTransfer.updateFxPrepareReservedForwarded.returns(Promise.resolve()) + await Fx.forwardedFxPrepare(fxPayload.commitRequestId) + test.ok(fxTransfer.updateFxPrepareReservedForwarded.calledWith(fxPayload.commitRequestId)) + test.pass() + test.end() + } catch (err) { + Logger.error(`handlePayeeResponse failed with error - ${err}`) + test.fail() + test.end() + } + }) + + forwardedPrepareTest.test('throw error', async (test) => { + try { + fxTransfer.updateFxPrepareReservedForwarded.throws(new Error()) + await Fx.forwardedFxPrepare(fxPayload.commitRequestId) + test.fail('Error not thrown') + test.end() + } catch (err) { + Logger.error(`handlePayeeResponse failed with error - ${err}`) + test.pass('Error thrown') + test.end() + } + }) + + forwardedPrepareTest.end() + }) + fxIndexTest.end() }) diff --git a/test/unit/handlers/transfers/prepare.test.js b/test/unit/handlers/transfers/prepare.test.js index 8fe9aa090..cabaa7b0e 100644 --- a/test/unit/handlers/transfers/prepare.test.js +++ b/test/unit/handlers/transfers/prepare.test.js @@ -38,6 +38,7 @@ const Kafka = require('@mojaloop/central-services-shared').Util.Kafka const ErrorHandler = require('@mojaloop/central-services-error-handling') const Validator = require('../../../../src/handlers/transfers/validator') const TransferService = require('../../../../src/domain/transfer') +const FxTransferService = require('../../../../src/domain/fx') const Cyril = require('../../../../src/domain/fx/cyril') const TransferObjectTransform = require('../../../../src/domain/transfer/transform') const MainUtil = require('@mojaloop/central-services-shared').Util @@ -198,7 +199,8 @@ const messageForwardedProtocol = { content: { uriParams: { id: transfer.transferId }, payload: { - proxyId: '' + proxyId: '', + transferId: transfer.transferId } }, metadata: { @@ -216,6 +218,33 @@ const messageForwardedProtocol = { pp: '' } +const messageFxForwardedProtocol = { + id: randomUUID(), + from: '', + to: '', + type: 'application/json', + content: { + uriParams: { id: fxTransfer.commitRequestId }, + payload: { + proxyId: '', + commitRequestId: fxTransfer.commitRequestId + } + }, + metadata: { + event: { + id: randomUUID(), + type: 'prepare', + action: 'fx-forwarded', + createdAt: new Date(), + state: { + status: 'success', + code: 0 + } + } + }, + pp: '' +} + const messageProtocolBulkPrepare = MainUtil.clone(messageProtocol) messageProtocolBulkPrepare.metadata.event.action = 'bulk-prepare' const messageProtocolBulkCommit = MainUtil.clone(messageProtocol) @@ -248,6 +277,13 @@ const forwardedMessages = [ } ] +const fxForwardedMessages = [ + { + topic: topicName, + value: messageFxForwardedProtocol + } +] + const config = { options: { mode: 2, @@ -381,6 +417,7 @@ Test('Transfer handler', transferHandlerTest => { sandbox.stub(Comparators) sandbox.stub(Validator) sandbox.stub(TransferService) + sandbox.stub(FxTransferService) sandbox.stub(fxTransferModel.fxTransfer) sandbox.stub(fxTransferModel.watchList) sandbox.stub(fxDuplicateCheck) @@ -983,7 +1020,7 @@ Test('Transfer handler', transferHandlerTest => { } }) - prepareTest.test('produce error for unexpected state', async (test) => { + prepareTest.test('produce error for unexpected state when receiving fowarded event message', async (test) => { await Consumer.createHandler(topicName, config, command) Kafka.transformAccountToTopicName.returns(topicName) Kafka.proceed.returns(true) @@ -998,7 +1035,7 @@ Test('Transfer handler', transferHandlerTest => { test.end() }) - prepareTest.test('produce error on transfer not found', async (test) => { + prepareTest.test('produce error on transfer not found when receiving forwarded event message', async (test) => { await Consumer.createHandler(topicName, config, command) Kafka.transformAccountToTopicName.returns(topicName) Kafka.proceed.returns(true) @@ -1013,6 +1050,30 @@ Test('Transfer handler', transferHandlerTest => { test.end() }) + prepareTest.test('produce error for unexpected state when receiving fx-fowarded event message', async (test) => { + await Consumer.createHandler(topicName, config, command) + Kafka.transformAccountToTopicName.returns(topicName) + Kafka.proceed.returns(true) + FxTransferService.getByIdLight.returns(Promise.resolve({ fxTransferState: Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT })) + + const result = await allTransferHandlers.prepare(null, fxForwardedMessages[0]) + test.equal(Kafka.proceed.getCall(0).args[2].fspiopError.errorInformation.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.code) + test.equal(result, true) + test.end() + }) + + prepareTest.test('produce error on transfer not found when receiving fx-forwarded event message', async (test) => { + await Consumer.createHandler(topicName, config, command) + Kafka.transformAccountToTopicName.returns(topicName) + Kafka.proceed.returns(true) + FxTransferService.getByIdLight.returns(Promise.resolve(null)) + + const result = await allTransferHandlers.prepare(null, fxForwardedMessages[0]) + test.equal(result, true) + test.equal(Kafka.proceed.getCall(0).args[2].fspiopError.errorInformation.errorCode, ErrorHandler.Enums.FSPIOPErrorCodes.ID_NOT_FOUND.code) + test.end() + }) + prepareTest.end() }) @@ -1395,6 +1456,17 @@ Test('Transfer handler', transferHandlerTest => { test.end() }) + prepareProxyTest.test('update reserved fxTransfer on fx-forwarded prepare message', async (test) => { + await Consumer.createHandler(topicName, config, command) + Kafka.transformAccountToTopicName.returns(topicName) + Kafka.proceed.returns(true) + FxTransferService.getByIdLight.returns(Promise.resolve({ fxTransferState: Enum.Transfers.TransferInternalState.RESERVED })) + const result = await allTransferHandlers.prepare(null, fxForwardedMessages[0]) + test.ok(FxTransferService.forwardedFxPrepare.called) + test.equal(result, true) + test.end() + }) + prepareProxyTest.end() })