From 2d7abfe72fabafe2a3f4092fa16d4ff7dde488b2 Mon Sep 17 00:00:00 2001 From: vijayg10 <33152110+vijayg10@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:12:58 +0530 Subject: [PATCH] feat: fulfil obligation tracking (#1063) * feat(csi-22): add proxy lib to handlers * diff * add * int tests * fix hanging int tests * fixes? * unit fixes? * coverage * feat: add zero adjustment for prepare position batch * feat: refactor proxy cache integration * feat: restore default * feat: minor optimization * test: update coverage * test: remove try-catch * fix: fix disconnect error * feat(prepare-position): add proxy substitution and zero adjustment logic * fix: remove uneeded async * feat: proxy cache update (#1061) * addressed comments * chore: refactor * test: add unit tests * chore: minor refactor * chore: lint * feat: revert prepare hadnler change, update test coverage * feat: update docker compose and default config for docker * chore: remove commented code * test: update test * test: update test * feat: added proxy check in fulfil handler * fix: derive fn * fix: checkSameCreditorDebtorProxy * unit tests * unit tests * int tests * fix: unit tests * chore: added unit tests for proxyCache deriveCurrencyId function * chore: added coverage * stuff * some int tests * comments * pass object * messy but working * coverage * hanging int test? * fix int tests * feat: refactor * clarify naming * comment * feat: added more test coverage * fixes? * dep update * fix: int tests * fix: disable tests around fspiop header validation in fulfil * fix: int tests * chore: disabled a fulfil test due to and issue in position handler * fix: int tests * chore: addressed pr comment * fix: lint * fix: integration tests --------- Co-authored-by: Kevin Leyow Co-authored-by: Steven Oderayi --- README.md | 4 +- package-lock.json | 8 +- package.json | 2 +- src/domain/fx/cyril.js | 113 +++-- src/domain/position/binProcessor.js | 8 +- src/handlers/transfers/handler.js | 135 +++--- src/lib/proxyCache.js | 33 ++ src/models/transfer/facade.js | 2 + .../handlers/transfers/handlers.test.js | 327 ++++++++++++++ test/unit/domain/fx/cyril.test.js | 403 +++++++++++++++++- test/unit/handlers/transfers/handler.test.js | 2 + test/unit/lib/proxyCache.test.js | 43 ++ test/unit/models/transfer/facade.test.js | 2 + 13 files changed, 961 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index a343f72e5..41f0f1817 100644 --- a/README.md +++ b/README.md @@ -191,7 +191,7 @@ If you want to run integration tests in a repetitive manner, you can startup the Start containers required for Integration Tests ```bash - docker-compose -f docker-compose.yml up -d mysql kafka init-kafka kafka-debug-console + docker-compose -f docker-compose.yml up -d mysql kafka init-kafka kafka-debug-console redis ``` Run wait script which will report once all required containers are up and running @@ -242,7 +242,7 @@ If you want to run override position topic tests you can repeat the above and us #### For running integration tests for batch processing interactively - Run dependecies ``` -docker-compose up -d mysql kafka init-kafka kafka-debug-console +docker-compose up -d mysql kafka init-kafka kafka-debug-console redis npm run wait-4-docker ``` - Run central-ledger services diff --git a/package-lock.json b/package-lock.json index 0fd5527f1..2ed34405c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -58,7 +58,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.0.0", + "npm-check-updates": "17.0.3", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", @@ -9632,9 +9632,9 @@ } }, "node_modules/npm-check-updates": { - "version": "17.0.0", - "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.0.0.tgz", - "integrity": "sha512-rXJTiUYBa+GzlvPgemFlwlTdsqS2C16trlW58d9it8u3Hnp0M+Fzmd3NsYBFCjlRlgMZwzuCIBKd9bvIz6yx0Q==", + "version": "17.0.3", + "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.0.3.tgz", + "integrity": "sha512-3UWnsnijmx4u9GnICHVCChz6JnhVLmYWqazoedWjLSY6hZB/QhMCps07vBbDmjWnHMhpl6YseAtFlvGbUq9Yrw==", "dev": true, "bin": { "ncu": "build/cli.js", diff --git a/package.json b/package.json index 95c072604..f70508722 100644 --- a/package.json +++ b/package.json @@ -133,7 +133,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.0.0", + "npm-check-updates": "17.0.3", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", diff --git a/src/domain/fx/cyril.js b/src/domain/fx/cyril.js index 393dcb983..88eaaf8d2 100644 --- a/src/domain/fx/cyril.js +++ b/src/domain/fx/cyril.js @@ -26,9 +26,9 @@ const Metrics = require('@mojaloop/central-services-metrics') const { Enum } = require('@mojaloop/central-services-shared') const TransferModel = require('../../models/transfer/transfer') -const ParticipantFacade = require('../../models/participant/facade') const { fxTransfer, watchList } = require('../../models/fxTransfer') const Config = require('../../lib/config') +const ProxyCache = require('../../lib/proxyCache') const checkIfDeterminingTransferExistsForTransferMessage = async (payload) => { // Does this determining transfer ID appear on the watch list? @@ -242,17 +242,15 @@ const processFulfilMessage = async (transferId, payload, transfer) => { // Create obligation between FXP and FX requesting party in currency of reservation // Find out the participantCurrencyId of the initiatingFsp // The following is hardcoded for Payer side conversion with SEND amountType. - const participantCurrency = await ParticipantFacade.getByNameAndCurrency( - fxTransferRecord.initiatingFspName, - fxTransferRecord.targetCurrency, - Enum.Accounts.LedgerAccountType.POSITION - ) - result.positionChanges.push({ - isFxTransferStateChange: false, - transferId, - participantCurrencyId: participantCurrency.participantCurrencyId, - amount: -fxTransferRecord.targetAmount - }) + const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(fxTransferRecord.initiatingFspName, fxTransferRecord.targetCurrency) + if (proxyParticipantAccountDetails.participantCurrencyId) { + result.positionChanges.push({ + isFxTransferStateChange: false, + transferId, + participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId, + amount: -fxTransferRecord.targetAmount + }) + } // TODO: Send PATCH notification to FXP } @@ -262,12 +260,15 @@ const processFulfilMessage = async (transferId, payload, transfer) => { sendingFxpExists = true sendingFxpRecord = fxTransferRecord // Create obligation between FX requesting party and FXP in currency of reservation - result.positionChanges.push({ - isFxTransferStateChange: true, - commitRequestId: fxTransferRecord.commitRequestId, - participantCurrencyId: fxTransferRecord.counterPartyFspSourceParticipantCurrencyId, - amount: -fxTransferRecord.sourceAmount - }) + const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(fxTransferRecord.counterPartyFspName, fxTransferRecord.sourceCurrency) + if (proxyParticipantAccountDetails.participantCurrencyId) { + result.positionChanges.push({ + isFxTransferStateChange: true, + commitRequestId: fxTransferRecord.commitRequestId, + participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId, + amount: -fxTransferRecord.sourceAmount + }) + } // TODO: Send PATCH notification to FXP } } @@ -279,34 +280,64 @@ const processFulfilMessage = async (transferId, payload, transfer) => { if (sendingFxpExists && receivingFxpExists) { // If we have both a sending and a receiving FXP, Create obligation between sending and receiving FXP in currency of transfer. - result.positionChanges.push({ - isFxTransferStateChange: true, - commitRequestId: receivingFxpRecord.commitRequestId, - participantCurrencyId: receivingFxpRecord.counterPartyFspSourceParticipantCurrencyId, - amount: -receivingFxpRecord.sourceAmount - }) + const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(receivingFxpRecord.counterPartyFspName, receivingFxpRecord.sourceCurrency) + if (proxyParticipantAccountDetails.participantCurrencyId) { + result.positionChanges.push({ + isFxTransferStateChange: true, + commitRequestId: receivingFxpRecord.commitRequestId, + participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId, + amount: -receivingFxpRecord.sourceAmount + }) + } } else if (sendingFxpExists) { // If we have a sending FXP, Create obligation between FXP and creditor party to the transfer in currency of FX transfer // Get participantCurrencyId for transfer.payeeParticipantId/transfer.payeeFsp and sendingFxpRecord.targetCurrency - const participantCurrency = await ParticipantFacade.getByNameAndCurrency( - transfer.payeeFsp, - sendingFxpRecord.targetCurrency, - Enum.Accounts.LedgerAccountType.POSITION - ) - result.positionChanges.push({ - isFxTransferStateChange: false, - transferId, - participantCurrencyId: participantCurrency.participantCurrencyId, - amount: -sendingFxpRecord.targetAmount - }) + const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(transfer.payeeFsp, sendingFxpRecord.targetCurrency) + if (proxyParticipantAccountDetails.participantCurrencyId) { + let isPositionChange = false + if (proxyParticipantAccountDetails.inScheme) { + isPositionChange = true + } else { + // We are not expecting this. Payee participant is a proxy and have an account in the targetCurrency. + // In this case we need to check if FXP is also a proxy and have the same account as payee. + const proxyParticipantAccountDetails2 = await ProxyCache.getProxyParticipantAccountDetails(sendingFxpRecord.counterPartyFspName, sendingFxpRecord.targetCurrency) + if (!proxyParticipantAccountDetails2.inScheme && (proxyParticipantAccountDetails.participantCurrencyId !== proxyParticipantAccountDetails2.participantCurrencyId)) { + isPositionChange = true + } + } + if (isPositionChange) { + result.positionChanges.push({ + isFxTransferStateChange: false, + transferId, + participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId, + amount: -sendingFxpRecord.targetAmount + }) + } + } } else if (receivingFxpExists) { // If we have a receiving FXP, Create obligation between debtor party to the transfer and FXP in currency of transfer - result.positionChanges.push({ - isFxTransferStateChange: true, - commitRequestId: receivingFxpRecord.commitRequestId, - participantCurrencyId: receivingFxpRecord.counterPartyFspSourceParticipantCurrencyId, - amount: -receivingFxpRecord.sourceAmount - }) + const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(receivingFxpRecord.counterPartyFspName, receivingFxpRecord.sourceCurrency) + if (proxyParticipantAccountDetails.participantCurrencyId) { + let isPositionChange = false + if (proxyParticipantAccountDetails.inScheme) { + isPositionChange = true + } else { + // We are not expecting this. FXP participant is a proxy and have an account in the sourceCurrency. + // In this case we need to check if Payer is also a proxy and have the same account as FXP. + const proxyParticipantAccountDetails2 = await ProxyCache.getProxyParticipantAccountDetails(transfer.payerFsp, receivingFxpRecord.sourceCurrency) + if (!proxyParticipantAccountDetails2.inScheme && (proxyParticipantAccountDetails.participantCurrencyId !== proxyParticipantAccountDetails2.participantCurrencyId)) { + isPositionChange = true + } + } + if (isPositionChange) { + result.positionChanges.push({ + isFxTransferStateChange: true, + commitRequestId: receivingFxpRecord.commitRequestId, + participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId, + amount: -receivingFxpRecord.sourceAmount + }) + } + } } // TODO: Remove entries from watchlist diff --git a/src/domain/position/binProcessor.js b/src/domain/position/binProcessor.js index d3fb0c3ff..945b124a1 100644 --- a/src/domain/position/binProcessor.js +++ b/src/domain/position/binProcessor.js @@ -56,6 +56,10 @@ const participantFacade = require('../../models/participant/facade') * @returns {results} - Returns a list of bins with results or throws an error if failed */ const processBins = async (bins, trx) => { + let notifyMessages = [] + let followupMessages = [] + let limitAlarms = [] + // Get transferIdList, reservedActionTransferIdList and commitRequestId for actions PREPARE, FX_PREPARE, FX_RESERVE, COMMIT and RESERVE const { transferIdList, reservedActionTransferIdList, commitRequestIdList } = await _getTransferIdList(bins) @@ -104,10 +108,6 @@ const processBins = async (bins, trx) => { reservedActionTransferIdList ) - let notifyMessages = [] - let followupMessages = [] - let limitAlarms = [] - // For each account-bin in the list for (const accountID in bins) { const accountBin = bins[accountID] diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js index 4d2fde6ed..8a507e253 100644 --- a/src/handlers/transfers/handler.js +++ b/src/handlers/transfers/handler.js @@ -198,86 +198,89 @@ const processFulfilMessage = async (message, functionality, span) => { throw fspiopError // Lets validate FSPIOP Source & Destination Headers + // In interscheme scenario, we store proxy fsp id in transferParticipant table and hence we can't compare that data with fspiop headers in fulfil } else if ( - validActionsForRouteValidations.includes(action) && // Lets only check headers for specific actions that need checking (i.e. bulk should not since its already done elsewhere) - ( - (headers[Enum.Http.Headers.FSPIOP.SOURCE] && (headers[Enum.Http.Headers.FSPIOP.SOURCE].toLowerCase() !== transfer.payeeFsp.toLowerCase())) || - (headers[Enum.Http.Headers.FSPIOP.DESTINATION] && (headers[Enum.Http.Headers.FSPIOP.DESTINATION].toLowerCase() !== transfer.payerFsp.toLowerCase())) - ) + validActionsForRouteValidations.includes(action) // Lets only check headers for specific actions that need checking (i.e. bulk should not since its already done elsewhere) ) { - /** - * If fulfilment request is coming from a source not matching transfer payee fsp or destination not matching transfer payer fsp, - */ - Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorSourceNotMatchingTransferFSPs--${actionLetter}2`)) - - // Lets set a default non-matching error to fallback-on - let fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'FSP does not match one of the fsp-id\'s associated with a transfer on the Fulfil callback response') - - // Lets make the error specific if the PayeeFSP IDs do not match - if (headers[Enum.Http.Headers.FSPIOP.SOURCE].toLowerCase() !== transfer.payeeFsp.toLowerCase()) { - fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, `${Enum.Http.Headers.FSPIOP.SOURCE} does not match payee fsp on the Fulfil callback response`) - } - - // Lets make the error specific if the PayerFSP IDs do not match - if (headers[Enum.Http.Headers.FSPIOP.DESTINATION].toLowerCase() !== transfer.payerFsp.toLowerCase()) { - fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, `${Enum.Http.Headers.FSPIOP.DESTINATION} does not match payer fsp on the Fulfil callback response`) - } + // Check if the payerFsp and payeeFsp are proxies and if they are, skip validating headers + if ( + (headers[Enum.Http.Headers.FSPIOP.SOURCE] && !transfer.payeeIsProxy && (headers[Enum.Http.Headers.FSPIOP.SOURCE].toLowerCase() !== transfer.payeeFsp.toLowerCase())) || + (headers[Enum.Http.Headers.FSPIOP.DESTINATION] && !transfer.payerIsProxy && (headers[Enum.Http.Headers.FSPIOP.DESTINATION].toLowerCase() !== transfer.payerFsp.toLowerCase())) + ) { + /** + * If fulfilment request is coming from a source not matching transfer payee fsp or destination not matching transfer payer fsp, + */ + Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorSourceNotMatchingTransferFSPs--${actionLetter}2`)) - const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING) + // Lets set a default non-matching error to fallback-on + let fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'FSP does not match one of the fsp-id\'s associated with a transfer on the Fulfil callback response') - // Set the event details to map to an ABORT_VALIDATION event targeted to the Position Handler - const eventDetail = { - functionality: TransferEventType.POSITION, - action: TransferEventAction.ABORT_VALIDATION - } + // Lets make the error specific if the PayeeFSP IDs do not match + if (!transfer.payeeIsProxy && (headers[Enum.Http.Headers.FSPIOP.SOURCE].toLowerCase() !== transfer.payeeFsp.toLowerCase())) { + fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, `${Enum.Http.Headers.FSPIOP.SOURCE} does not match payee fsp on the Fulfil callback response`) + } - // Lets handle the abort validation and change the transfer state to reflect this - const transferAbortResult = await TransferService.handlePayeeResponse(transferId, payload, TransferEventAction.ABORT_VALIDATION, apiFSPIOPError) + // Lets make the error specific if the PayerFSP IDs do not match + if (!transfer.payerIsProxy && (headers[Enum.Http.Headers.FSPIOP.DESTINATION].toLowerCase() !== transfer.payerFsp.toLowerCase())) { + fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, `${Enum.Http.Headers.FSPIOP.DESTINATION} does not match payer fsp on the Fulfil callback response`) + } - /** - * TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967) - * HOWTO: For regular transfers, send the fulfil from non-payee dfsp. - * Not sure if it will apply to bulk, as it could/should be captured - * at BulkPrepareHander. To be verified as part of future story. - */ + const apiFSPIOPError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING) - // Publish message to Position Handler - // Key position abort with payer account id - const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: transfer.payerFsp, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) + // Set the event details to map to an ABORT_VALIDATION event targeted to the Position Handler + const eventDetail = { + functionality: TransferEventType.POSITION, + action: TransferEventAction.ABORT_VALIDATION + } - /** - * Send patch notification callback to original payee fsp if they asked for a a patch response. - */ - if (action === TransferEventAction.RESERVE) { - Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackReservedAborted--${actionLetter}3`)) + // Lets handle the abort validation and change the transfer state to reflect this + const transferAbortResult = await TransferService.handlePayeeResponse(transferId, payload, TransferEventAction.ABORT_VALIDATION, apiFSPIOPError) - // Set the event details to map to an RESERVE_ABORTED event targeted to the Notification Handler - const reserveAbortedEventDetail = { functionality: TransferEventType.NOTIFICATION, action: TransferEventAction.RESERVED_ABORTED } + /** + * TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967) + * HOWTO: For regular transfers, send the fulfil from non-payee dfsp. + * Not sure if it will apply to bulk, as it could/should be captured + * at BulkPrepareHander. To be verified as part of future story. + */ - // Extract error information - const errorCode = apiFSPIOPError && apiFSPIOPError.errorInformation && apiFSPIOPError.errorInformation.errorCode - const errorDescription = apiFSPIOPError && apiFSPIOPError.errorInformation && apiFSPIOPError.errorInformation.errorDescription + // Publish message to Position Handler + // Key position abort with payer account id + const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: transfer.payerFsp, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) - // TODO: This should be handled by a PATCH /transfers/{id}/error callback in the future FSPIOP v1.2 specification, and instead we should just send the FSPIOP-Error instead! Ref: https://github.com/mojaloop/mojaloop-specification/issues/106. - const reservedAbortedPayload = { - transferId: transferAbortResult && transferAbortResult.id, - completedTimestamp: transferAbortResult && transferAbortResult.completedTimestamp && (new Date(Date.parse(transferAbortResult.completedTimestamp))).toISOString(), - transferState: TransferState.ABORTED, - extensionList: { // lets add the extension list to handle the limitation of the FSPIOP v1.1 specification by adding the error cause... - extension: [ - { - key: 'cause', - value: `${errorCode}: ${errorDescription}` - } - ] + /** + * Send patch notification callback to original payee fsp if they asked for a a patch response. + */ + if (action === TransferEventAction.RESERVE) { + Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackReservedAborted--${actionLetter}3`)) + + // Set the event details to map to an RESERVE_ABORTED event targeted to the Notification Handler + const reserveAbortedEventDetail = { functionality: TransferEventType.NOTIFICATION, action: TransferEventAction.RESERVED_ABORTED } + + // Extract error information + const errorCode = apiFSPIOPError && apiFSPIOPError.errorInformation && apiFSPIOPError.errorInformation.errorCode + const errorDescription = apiFSPIOPError && apiFSPIOPError.errorInformation && apiFSPIOPError.errorInformation.errorDescription + + // TODO: This should be handled by a PATCH /transfers/{id}/error callback in the future FSPIOP v1.2 specification, and instead we should just send the FSPIOP-Error instead! Ref: https://github.com/mojaloop/mojaloop-specification/issues/106. + const reservedAbortedPayload = { + transferId: transferAbortResult && transferAbortResult.id, + completedTimestamp: transferAbortResult && transferAbortResult.completedTimestamp && (new Date(Date.parse(transferAbortResult.completedTimestamp))).toISOString(), + transferState: TransferState.ABORTED, + extensionList: { // lets add the extension list to handle the limitation of the FSPIOP v1.1 specification by adding the error cause... + extension: [ + { + key: 'cause', + value: `${errorCode}: ${errorDescription}` + } + ] + } } + message.value.content.payload = reservedAbortedPayload + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, fromSwitch: true, toDestination: transfer.payeeFsp, hubName: Config.HUB_NAME }) } - message.value.content.payload = reservedAbortedPayload - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, fromSwitch: true, toDestination: transfer.payeeFsp, hubName: Config.HUB_NAME }) - } - throw apiFSPIOPError + throw apiFSPIOPError + } } // If execution continues after this point we are sure transfer exists and source matches payee fsp diff --git a/src/lib/proxyCache.js b/src/lib/proxyCache.js index 4d06490e4..40f50f357 100644 --- a/src/lib/proxyCache.js +++ b/src/lib/proxyCache.js @@ -1,5 +1,6 @@ 'use strict' const { createProxyCache, STORAGE_TYPES } = require('@mojaloop/inter-scheme-proxy-cache-lib') +const { Enum } = require('@mojaloop/central-services-shared') const ParticipantService = require('../../src/domain/participant') const Config = require('./config.js') @@ -54,11 +55,43 @@ const checkSameCreditorDebtorProxy = async (debtorDfspId, creditorDfspId) => { return debtorProxyId && creditorProxyId ? debtorProxyId === creditorProxyId : false } +const getProxyParticipantAccountDetails = async (fspName, currency) => { + const proxyLookupResult = await getFSPProxy(fspName) + if (proxyLookupResult.inScheme) { + const participantCurrency = await ParticipantService.getAccountByNameAndCurrency( + fspName, + currency, + Enum.Accounts.LedgerAccountType.POSITION + ) + return { + inScheme: true, + participantCurrencyId: participantCurrency?.participantCurrencyId || null + } + } else { + if (proxyLookupResult.proxyId) { + const participantCurrency = await ParticipantService.getAccountByNameAndCurrency( + proxyLookupResult.proxyId, + currency, + Enum.Accounts.LedgerAccountType.POSITION + ) + return { + inScheme: false, + participantCurrencyId: participantCurrency?.participantCurrencyId || null + } + } + return { + inScheme: false, + participantCurrencyId: null + } + } +} + module.exports = { reset, // for testing connect, disconnect, getCache, getFSPProxy, + getProxyParticipantAccountDetails, checkSameCreditorDebtorProxy } diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 020201982..7a1e9ae4d 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -83,10 +83,12 @@ const getById = async (id) => { 'tp1.amount AS payerAmount', 'da.participantId AS payerParticipantId', 'da.name AS payerFsp', + 'da.isProxy AS payerIsProxy', 'pc2.participantCurrencyId AS payeeParticipantCurrencyId', 'tp2.amount AS payeeAmount', 'ca.participantId AS payeeParticipantId', 'ca.name AS payeeFsp', + 'ca.isProxy AS payeeIsProxy', 'tsc.transferStateChangeId', 'tsc.transferStateId AS transferState', 'tsc.reason AS reason', diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index ebd93c318..ef03c5823 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -193,6 +193,7 @@ const prepareTestData = async (dataObj) => { currency: dataObj.currencies[1], limit: { value: dataObj.fxp.limit } }) + await ParticipantFundsInOutHelper.recordFundsIn(payer.participant.name, payer.participantCurrencyId2, { currency: dataObj.amount.currency, amount: 10000 @@ -1101,6 +1102,332 @@ Test('Handlers test', async handlersTest => { transferProxyPrepare.end() }) + await handlersTest.test('transferProxyFulfil should', async transferProxyPrepare => { + await transferProxyPrepare.test(` + Scheme B: PUT /transfers call I.e. From: Payee DFSP → To: Proxy RB + Payee DFSP position account must be updated`, async (test) => { + const transferPrepareFrom = 'schemeAPayerFsp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareFrom, td.proxyRB.participant.name) + + // Prepare the transfer + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + td.messageProtocolPrepare.content.from = transferPrepareFrom + td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom + td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom + + await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'prepare', + // A position prepare message reserving the proxy of ProxyRB on it's XXX participant currency account + keyFilter: td.proxyRB.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with key of fxp target currency account found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Fulfil the transfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'commit', + keyFilter: td.payee.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferProxyPrepare.test(` + Scheme R: PUT /transfers call I.e. From: Proxy RB → To: Proxy AR + If it is a normal transfer without currency conversion + ProxyRB account must be updated`, async (test) => { + const transferPrepareFrom = 'schemeAPayerFsp' + const transferPrepareTo = 'schemeBPayeeFsp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareFrom, td.proxyAR.participant.name) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareTo, td.proxyRB.participant.name) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + td.messageProtocolPrepare.content.from = transferPrepareFrom + td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom + td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo + td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom + td.messageProtocolPrepare.content.payload.payeeFsp = transferPrepareTo + + await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'prepare', + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with key of proxyAR account found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Fulfil the transfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFulfil.content.from = transferPrepareTo + td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo + td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFulfil = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'commit', + keyFilter: td.proxyRB.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferProxyPrepare.test(` + Scheme R: PUT /transfers call I.e. From: Proxy RB → To: Proxy AR + If it is a FX transfer with currency conversion + FXP and ProxyRB account must be updated`, async (test) => { + const transferPrepareFrom = 'schemeAPayerFsp' + const transferPrepareTo = 'schemeBPayeeFsp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareFrom, td.proxyAR.participant.name) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareTo, td.proxyRB.participant.name) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + // FX Transfer from proxyAR to FXP + td.messageProtocolFxPrepare.content.from = transferPrepareFrom + td.messageProtocolFxPrepare.content.headers['fspiop-source'] = transferPrepareFrom + td.messageProtocolFxPrepare.content.payload.initiatingFsp = transferPrepareFrom + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + // To be keyed with the Proxy AR participantCurrencyId + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with proxyAR key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Create subsequent transfer + td.messageProtocolPrepare.content.from = transferPrepareFrom + td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.content.headers['fspiop-source'] = transferPrepareFrom + td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo + td.messageProtocolPrepare.content.payload.payerFsp = transferPrepareFrom + td.messageProtocolPrepare.content.payload.payeeFsp = transferPrepareTo + + await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'prepare', + // A position prepare message reserving the FXP's targeted currency account should be created + keyFilter: td.fxp.participantCurrencyIdSecondary.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with key of fxp target currency account found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Fulfil the transfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFulfil.content.from = transferPrepareTo + td.messageProtocolFulfil.content.to = transferPrepareFrom + td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo + td.messageProtocolFulfil.content.headers['fspiop-destination'] = transferPrepareFrom + + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) + + try { + const positionFulfil1 = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'commit', + keyFilter: td.fxp.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + const positionFulfil2 = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'commit', + keyFilter: td.proxyRB.participantCurrencyIdSecondary.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil1[0], 'Position fulfil message with key found') + test.ok(positionFulfil2[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + testConsumer.clearEvents() + test.end() + }) + + await transferProxyPrepare.test(` + Scheme A: PUT /transfers call I.e. From: Proxy AR → To: Payer FSP + If it is a FX transfer with currency conversion + PayerFSP and ProxyAR account must be updated`, async (test) => { + const transferPrepareTo = 'schemeBPayeeFsp' + const fxTransferPrepareTo = 'schemeRFxp' + + const td = await prepareTestData(testData) + await ProxyCache.getCache().addDfspIdToProxyMapping(fxTransferPrepareTo, td.proxyAR.participant.name) + await ProxyCache.getCache().addDfspIdToProxyMapping(transferPrepareTo, td.proxyAR.participant.name) + + const prepareConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + prepareConfig.logger = Logger + + // FX Transfer from payer to proxyAR + td.messageProtocolFxPrepare.content.to = fxTransferPrepareTo + td.messageProtocolFxPrepare.content.headers['fspiop-destination'] = fxTransferPrepareTo + td.messageProtocolFxPrepare.content.payload.counterPartyFsp = fxTransferPrepareTo + await Producer.produceMessage(td.messageProtocolFxPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'fx-prepare', + // To be keyed with the PayerFSP participantCurrencyId + keyFilter: td.payer.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with proxyAR key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // Create subsequent transfer + td.messageProtocolPrepare.content.to = transferPrepareTo + td.messageProtocolPrepare.content.headers['fspiop-destination'] = transferPrepareTo + td.messageProtocolPrepare.content.payload.payeeFsp = transferPrepareTo + + await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, prepareConfig) + + try { + const positionPrepare = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'prepare', + // A position prepare message without need for any position changes should be created (key 0) + keyFilter: '0' + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionPrepare[0], 'Position prepare message with key of fxp target currency account found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } + + // TODO: It seems there is an issue in position handler. Its not processing the messages with key 0. + // It should change the state of the transfer to RESERVED in the prepare step. + // Until the issue with position handler is resolved. Commenting the following test. + // // Fulfil the transfer + // const fulfilConfig = Utility.getKafkaConfig( + // Config.KAFKA_CONFIG, + // Enum.Kafka.Config.PRODUCER, + // TransferEventType.TRANSFER.toUpperCase(), + // TransferEventType.FULFIL.toUpperCase()) + // fulfilConfig.logger = Logger + + // td.messageProtocolFulfil.content.from = transferPrepareTo + // td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo + // testConsumer.clearEvents() + // await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) + // try { + // const positionFulfil1 = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + // topicFilter: 'topic-transfer-position-batch', + // action: 'commit', + // keyFilter: td.proxyAR.participantCurrencyId.toString() + // }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + // test.ok(positionFulfil1[0], 'Position fulfil message with key found') + // } catch (err) { + // test.notOk('Error should not be thrown') + // console.error(err) + // } + + testConsumer.clearEvents() + test.end() + }) + + transferProxyPrepare.end() + }) + await handlersTest.test('teardown', async (assert) => { try { await Handlers.timeouts.stop() diff --git a/test/unit/domain/fx/cyril.test.js b/test/unit/domain/fx/cyril.test.js index bc4a227a2..7555c7d11 100644 --- a/test/unit/domain/fx/cyril.test.js +++ b/test/unit/domain/fx/cyril.test.js @@ -8,6 +8,9 @@ const { Enum } = require('@mojaloop/central-services-shared') const TransferModel = require('../../../../src/models/transfer/transfer') const ParticipantFacade = require('../../../../src/models/participant/facade') const { fxTransfer, watchList } = require('../../../../src/models/fxTransfer') +const ProxyCache = require('../../../../src/lib/proxyCache') + +const defaultGetProxyParticipantAccountDetailsResponse = { inScheme: true, participantCurrencyId: 1 } Test('Cyril', cyrilTest => { let sandbox @@ -20,6 +23,7 @@ Test('Cyril', cyrilTest => { sandbox.stub(fxTransfer) sandbox.stub(TransferModel) sandbox.stub(ParticipantFacade) + sandbox.stub(ProxyCache) payload = { transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999', payerFsp: 'dfsp1', @@ -316,14 +320,15 @@ Test('Cyril', cyrilTest => { participantName: 'fx_dfsp1', isActive: 1 })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) - test.ok(ParticipantFacade.getByNameAndCurrency.calledWith( + test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( 'dfsp2', - fxPayload.targetAmount.currency, - Enum.Accounts.LedgerAccountType.POSITION + fxPayload.targetAmount.currency )) + test.deepEqual(result, { isFx: true, positionChanges: [{ @@ -377,6 +382,7 @@ Test('Cyril', cyrilTest => { participantName: 'payeeFsp', isActive: 1 })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) @@ -442,6 +448,7 @@ Test('Cyril', cyrilTest => { participantName: 'payeeFsp', isActive: 1 })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) @@ -478,6 +485,396 @@ Test('Cyril', cyrilTest => { test.end() } }) + + processFulfilMessageTest.test('process watchlist with only payer conversion found, but payee is a proxy and have no account in the currency', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYER_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 2, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'fx_dfsp1', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: false, participantCurrencyId: null })) + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( + 'dfsp2', + fxPayload.targetAmount.currency + )) + + test.deepEqual(result, { + isFx: true, + positionChanges: [], + patchNotifications: [] + }) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with only payer conversion found, but payee is a proxy and have account in the currency somehow', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYER_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 2, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'fx_dfsp1', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.onCall(0).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // FXP Source Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(1).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // Payee Target Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 345 })) // FXP Target Currency + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( + 'dfsp2', + fxPayload.targetAmount.currency + )) + + test.deepEqual(result, { + isFx: true, + positionChanges: [ + { + isFxTransferStateChange: true, + commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', + participantCurrencyId: 234, + amount: -433.88 + }, + { + isFxTransferStateChange: false, + transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999', + participantCurrencyId: 456, + amount: -200 + } + ], + patchNotifications: [] + }) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with only payer conversion found, but payee is a proxy and have account in the currency somehow and it is same as fxp target account', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYER_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 2, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'fx_dfsp1', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.onCall(0).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // FXP Source Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(1).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // Payee Target Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // FXP Target Currency + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( + 'dfsp2', + fxPayload.targetAmount.currency + )) + + test.deepEqual(result, { + isFx: true, + positionChanges: [ + { + isFxTransferStateChange: true, + commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', + participantCurrencyId: 234, + amount: -433.88 + } + ], + patchNotifications: [] + }) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with only payee conversion found but fxp is proxy and have no account', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYEE_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 1, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'payeeFsp', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: false, participantCurrencyId: null })) + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.deepEqual(result, { + isFx: true, + positionChanges: [], + patchNotifications: [] + } + ) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with only payee conversion found but fxp is proxy and have account in source currency somehow', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYEE_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 1, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'payeeFsp', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.onCall(0).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // Payee Target Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(1).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // FXP Source Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 123 })) // Payer Source Currency + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.deepEqual(result, { + isFx: true, + positionChanges: [ + { + isFxTransferStateChange: false, + transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999', + participantCurrencyId: 456, + amount: -200 + }, + { + isFxTransferStateChange: true, + commitRequestId: '88622a75-5bde-4da4-a6cc-f4cd23b268c4', + participantCurrencyId: 234, + amount: -433.88 + } + ], + patchNotifications: [] + } + ) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with only payee conversion found but fxp is proxy and have account in source currency somehow and it is same as payer account', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [{ + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYEE_CONVERSION, + createdDate: new Date() + }] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 1, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'payeeFsp', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.onCall(0).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // Payee Target Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(1).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // FXP Source Currency + ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // Payer Source Currency + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.deepEqual(result, { + isFx: true, + positionChanges: [ + { + isFxTransferStateChange: false, + transferId: 'b51ec534-ee48-4575-b6a9-ead2955b8999', + participantCurrencyId: 456, + amount: -200 + } + ], + patchNotifications: [] + } + ) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) + + processFulfilMessageTest.test('process watchlist with both payer and payee conversion found, but derived currencyId is null', async (test) => { + try { + watchList.getItemsInWatchListByDeterminingTransferId.returns(Promise.resolve( + [ + { + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYEE_CONVERSION, + createdDate: new Date() + }, + { + commitRequestId: fxPayload.commitRequestId, + determiningTransferId: fxPayload.determiningTransferId, + fxTransferTypeId: Enum.Fx.FxTransferType.PAYER_CONVERSION, + createdDate: new Date() + } + ] + )) + fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + { + initiatingFspParticipantId: 1, + targetAmount: fxPayload.targetAmount.amount, + commitRequestId: fxPayload.commitRequestId, + counterPartyFspSourceParticipantCurrencyId: 1, + counterPartyFspTargetParticipantCurrencyId: 2, + sourceAmount: fxPayload.sourceAmount.amount, + targetCurrency: fxPayload.targetAmount.currency + } + )) + ParticipantFacade.getByNameAndCurrency.returns(Promise.resolve({ + participantId: 1, + participantCurrencyId: 1, + participantName: 'payeeFsp', + isActive: 1 + })) + ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: true, participantCurrencyId: null })) + const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) + test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.deepEqual(result, { + isFx: true, + positionChanges: [], + patchNotifications: [] + } + ) + test.pass('Error not thrown') + test.end() + } catch (e) { + console.log(e) + test.fail('Error Thrown') + test.end() + } + }) processFulfilMessageTest.end() }) diff --git a/test/unit/handlers/transfers/handler.test.js b/test/unit/handlers/transfers/handler.test.js index 173345fee..32363fcfc 100644 --- a/test/unit/handlers/transfers/handler.test.js +++ b/test/unit/handlers/transfers/handler.test.js @@ -266,6 +266,8 @@ Test('Transfer handler', transferHandlerTest => { connect: sandbox.stub(), disconnect: sandbox.stub() }) + sandbox.stub(ProxyCache, 'getProxyParticipantAccountDetails').resolves({ inScheme: true, participantCurrencyId: 1 }) + sandbox.stub(ProxyCache, 'checkSameCreditorDebtorProxy').resolves(false) const stubs = mocks.createTracerStub(sandbox) SpanStub = stubs.SpanStub diff --git a/test/unit/lib/proxyCache.test.js b/test/unit/lib/proxyCache.test.js index 9b5db4eeb..3aa637132 100644 --- a/test/unit/lib/proxyCache.test.js +++ b/test/unit/lib/proxyCache.test.js @@ -135,5 +135,48 @@ Test('Proxy Cache test', async (proxyCacheTest) => { checkSameCreditorDebtorProxyTest.end() }) + await proxyCacheTest.test('getProxyParticipantAccountDetails', async (getProxyParticipantAccountDetailsTest) => { + await getProxyParticipantAccountDetailsTest.test('resolve participantCurrencyId if participant is in scheme', async (test) => { + ParticipantService.getByName.returns(Promise.resolve({ participantId: 1 })) + ParticipantService.getAccountByNameAndCurrency.returns(Promise.resolve({ participantCurrencyId: 123 })) + const result = await ProxyCache.getProxyParticipantAccountDetails('nonExistingDfspId1', 'XXX') + test.deepEqual(result, { inScheme: true, participantCurrencyId: 123 }) + test.end() + }) + + await getProxyParticipantAccountDetailsTest.test('resolve participantCurrencyId of the proxy if participant is not in scheme', async (test) => { + ParticipantService.getByName.returns(Promise.resolve(null)) + ParticipantService.getAccountByNameAndCurrency.returns(Promise.resolve({ participantCurrencyId: 456 })) + const result = await ProxyCache.getProxyParticipantAccountDetails('existingDfspId1', 'XXX') + test.deepEqual(result, { inScheme: false, participantCurrencyId: 456 }) + test.end() + }) + + await getProxyParticipantAccountDetailsTest.test('resolve null if participant is in scheme and there is no account', async (test) => { + ParticipantService.getByName.returns(Promise.resolve({ participantId: 1 })) + ParticipantService.getAccountByNameAndCurrency.returns(Promise.resolve(null)) + const result = await ProxyCache.getProxyParticipantAccountDetails('nonExistingDfspId1', 'XXX') + test.deepEqual(result, { inScheme: true, participantCurrencyId: null }) + test.end() + }) + + await getProxyParticipantAccountDetailsTest.test('resolve null if participant is not in scheme and also there is no proxy in cache', async (test) => { + ParticipantService.getByName.returns(Promise.resolve(null)) + const result = await ProxyCache.getProxyParticipantAccountDetails('nonExistingDfspId1', 'XXX') + test.deepEqual(result, { inScheme: false, participantCurrencyId: null }) + test.end() + }) + + await getProxyParticipantAccountDetailsTest.test('resolve null if participant is not in scheme and proxy exists but no account', async (test) => { + ParticipantService.getByName.returns(Promise.resolve(null)) + ParticipantService.getAccountByNameAndCurrency.returns(Promise.resolve(null)) + const result = await ProxyCache.getProxyParticipantAccountDetails('existingDfspId1', 'XXX') + test.deepEqual(result, { inScheme: false, participantCurrencyId: null }) + test.end() + }) + + getProxyParticipantAccountDetailsTest.end() + }) + proxyCacheTest.end() }) diff --git a/test/unit/models/transfer/facade.test.js b/test/unit/models/transfer/facade.test.js index 887f17a39..92ff125d8 100644 --- a/test/unit/models/transfer/facade.test.js +++ b/test/unit/models/transfer/facade.test.js @@ -244,10 +244,12 @@ Test('Transfer facade', async (transferFacadeTest) => { 'tp1.amount AS payerAmount', 'da.participantId AS payerParticipantId', 'da.name AS payerFsp', + 'da.isProxy AS payerIsProxy', 'pc2.participantCurrencyId AS payeeParticipantCurrencyId', 'tp2.amount AS payeeAmount', 'ca.participantId AS payeeParticipantId', 'ca.name AS payeeFsp', + 'ca.isProxy AS payeeIsProxy', 'tsc.transferStateChangeId', 'tsc.transferStateId AS transferState', 'tsc.reason AS reason',