From 11e68c6d659a0270f84e3cb5ed17911f7dc0b8ec Mon Sep 17 00:00:00 2001 From: Steven Oderayi Date: Mon, 24 Jun 2024 12:49:34 +0100 Subject: [PATCH] feat: parameterize switch id --- docker/ml-api-adapter/default.json | 5 +- package-lock.json | 42 ++-- package.json | 6 +- src/domain/position/fulfil.js | 4 +- src/domain/position/fx-fulfil.js | 4 +- src/domain/position/fx-prepare.js | 12 +- src/domain/position/prepare.js | 12 +- src/handlers/bulk/fulfil/handler.js | 10 +- src/handlers/bulk/get/handler.js | 10 +- src/handlers/bulk/prepare/handler.js | 18 +- src/handlers/bulk/processing/handler.js | 15 +- src/handlers/bulk/shared/validator.js | 2 +- src/handlers/positions/handler.js | 15 +- src/handlers/positions/handlerBatch.js | 2 +- src/handlers/timeouts/handler.js | 10 +- src/handlers/transfers/FxFulfilService.js | 226 +++++++++++++++---- src/handlers/transfers/handler.js | 46 ++-- src/handlers/transfers/prepare.js | 16 +- test/fixtures.js | 3 +- test/unit/domain/position/fulfil.test.js | 7 +- test/unit/domain/position/fx-fulfil.test.js | 3 +- test/unit/domain/position/fx-prepare.test.js | 17 +- test/unit/domain/position/prepare.test.js | 21 +- 23 files changed, 327 insertions(+), 179 deletions(-) diff --git a/docker/ml-api-adapter/default.json b/docker/ml-api-adapter/default.json index e701c2891..d58b20fce 100644 --- a/docker/ml-api-adapter/default.json +++ b/docker/ml-api-adapter/default.json @@ -1,4 +1,8 @@ { + "HUB_PARTICIPANT": { + "ID": 1, + "NAME": "Hub" + }, "PORT": 3000, "HOSTNAME": "http://ml-api-adapter", "ENDPOINT_SOURCE_URL": "http://host.docker.internal:3001", @@ -13,7 +17,6 @@ }, "JWS": { "JWS_SIGN": false, - "FSPIOP_SOURCE_TO_SIGN": "switch", "JWS_SIGNING_KEY_PATH": "secrets/jwsSigningKey.key" } }, diff --git a/package-lock.json b/package-lock.json index a4721e444..f46bc84c3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,7 +19,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.4.0-snapshot.17", + "@mojaloop/central-services-shared": "18.5.0-snapshot.2", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.5", "@mojaloop/event-sdk": "14.1.1", @@ -36,7 +36,7 @@ "docdash": "2.0.2", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", - "glob": "10.4.1", + "glob": "10.4.2", "hapi-auth-basic": "5.0.0", "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "17.2.1", @@ -54,7 +54,7 @@ "get-port": "5.1.1", "jsdoc": "4.0.3", "jsonpath": "1.1.1", - "nodemon": "3.1.3", + "nodemon": "3.1.4", "npm-check-updates": "16.14.20", "nyc": "17.0.0", "pre-commit": "1.2.2", @@ -1577,9 +1577,9 @@ } }, "node_modules/@mojaloop/central-services-shared": { - "version": "18.4.0-snapshot.17", - "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.4.0-snapshot.17.tgz", - "integrity": "sha512-GEJhxLi+t7t+y7KqAYv6RsalW5MFavmmAY3Qu12Zf+GgU/W+Ln+a4R5kxWjBLAnvPKwYPdppm0c6F/a44Gfx5g==", + "version": "18.5.0-snapshot.2", + "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-18.5.0-snapshot.2.tgz", + "integrity": "sha512-GbLb8mk5wqEV/5LlPg9F0eRBh1AKHeLXNTXwlLTP5NDSYlwFUTRVMJ+7R5QWkDbyR7O1BnhNoaY06i62/nq/QA==", "dependencies": { "@hapi/catbox": "12.1.1", "@hapi/catbox-memory": "5.0.1", @@ -1597,13 +1597,13 @@ "shins": "2.6.0", "uuid4": "2.0.3", "widdershins": "^4.0.1", - "yaml": "2.4.2" + "yaml": "2.4.5" }, "peerDependencies": { - "@mojaloop/central-services-error-handling": ">=12.x.x", + "@mojaloop/central-services-error-handling": ">=13.x.x", "@mojaloop/central-services-logger": ">=11.x.x", "@mojaloop/central-services-metrics": ">=12.x.x", - "@mojaloop/event-sdk": ">=14.x.x", + "@mojaloop/event-sdk": ">=14.1.1", "ajv": "8.x.x", "ajv-keywords": "5.x.x" }, @@ -7161,14 +7161,15 @@ "dev": true }, "node_modules/glob": { - "version": "10.4.1", - "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.1.tgz", - "integrity": "sha512-2jelhlq3E4ho74ZyVLN03oKdAZVUa6UDZzFLVH1H7dnoax+y9qyaq8zBkfDIggjniU19z0wU18y16jMB2eyVIw==", + "version": "10.4.2", + "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.2.tgz", + "integrity": "sha512-GwMlUF6PkPo3Gk21UxkCohOv0PLcIXVtKyLlpEI28R/cO/4eNOdmLk3CMW1wROV/WR/EsZOWAfBbBOqYvs88/w==", "dependencies": { "foreground-child": "^3.1.0", "jackspeak": "^3.1.2", "minimatch": "^9.0.4", "minipass": "^7.1.2", + "package-json-from-dist": "^1.0.0", "path-scurry": "^1.11.1" }, "bin": { @@ -11172,9 +11173,9 @@ "dev": true }, "node_modules/nodemon": { - "version": "3.1.3", - "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.3.tgz", - "integrity": "sha512-m4Vqs+APdKzDFpuaL9F9EVOF85+h070FnkHVEoU4+rmT6Vw0bmNl7s61VEkY/cJkL7RCv1p4urnUDUMrS5rk2w==", + "version": "3.1.4", + "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.4.tgz", + "integrity": "sha512-wjPBbFhtpJwmIeY2yP7QF+UKzPfltVGtfce1g/bB15/8vCGZj8uxD62b/b9M9/WVgme0NZudpownKN+c0plXlQ==", "dev": true, "dependencies": { "chokidar": "^3.5.2", @@ -12414,6 +12415,11 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/package-json-from-dist": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz", + "integrity": "sha512-dATvCeZN/8wQsGywez1mzHtTlP22H8OEfPrVMLNr4/eGa+ijtLn/6M5f0dY8UKNrC2O9UCU6SSoG3qRKnt7STw==" + }, "node_modules/pacote": { "version": "15.2.0", "resolved": "https://registry.npmjs.org/pacote/-/pacote-15.2.0.tgz", @@ -17660,9 +17666,9 @@ "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" }, "node_modules/yaml": { - "version": "2.4.2", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.4.2.tgz", - "integrity": "sha512-B3VqDZ+JAg1nZpaEmWtTXUlBneoGx6CPM9b0TENK6aoSu5t73dItudwdgmi6tHlIZZId4dZ9skcAQ2UbcyAeVA==", + "version": "2.4.5", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.4.5.tgz", + "integrity": "sha512-aBx2bnqDzVOyNKfsysjA2ms5ZlnjSAW2eG3/L5G/CSujfjLJTJsEw1bGw8kCf04KodQWk1pxlGnZ56CRxiawmg==", "bin": { "yaml": "bin.mjs" }, diff --git a/package.json b/package.json index 44a5a7180..6acc6dbff 100644 --- a/package.json +++ b/package.json @@ -101,7 +101,7 @@ "@mojaloop/central-services-health": "15.0.0", "@mojaloop/central-services-logger": "11.3.1", "@mojaloop/central-services-metrics": "12.0.8", - "@mojaloop/central-services-shared": "18.4.0-snapshot.17", + "@mojaloop/central-services-shared": "18.5.0-snapshot.2", "@mojaloop/central-services-stream": "11.3.1", "@mojaloop/database-lib": "11.0.5", "@mojaloop/event-sdk": "14.1.1", @@ -118,7 +118,7 @@ "docdash": "2.0.2", "event-stream": "4.0.1", "five-bells-condition": "5.0.1", - "glob": "10.4.1", + "glob": "10.4.2", "hapi-auth-basic": "5.0.0", "hapi-auth-bearer-token": "8.0.0", "hapi-swagger": "17.2.1", @@ -139,7 +139,7 @@ "get-port": "5.1.1", "jsdoc": "4.0.3", "jsonpath": "1.1.1", - "nodemon": "3.1.3", + "nodemon": "3.1.4", "npm-check-updates": "16.14.20", "nyc": "17.0.0", "pre-commit": "1.2.2", diff --git a/src/domain/position/fulfil.js b/src/domain/position/fulfil.js index eb95323a8..f8d0d82c5 100644 --- a/src/domain/position/fulfil.js +++ b/src/domain/position/fulfil.js @@ -129,7 +129,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate // set destination to payeefsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError( @@ -151,7 +151,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate return Utility.StreamingProtocol.createMessage( transferId, payeeFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, diff --git a/src/domain/position/fx-fulfil.js b/src/domain/position/fx-fulfil.js index 284014c28..188dddda7 100644 --- a/src/domain/position/fx-fulfil.js +++ b/src/domain/position/fx-fulfil.js @@ -41,7 +41,7 @@ const processPositionFxFulfilBin = async ( // set destination to counterPartyFsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = counterPartyFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] // TODO: Confirm if this setting transferStateId to ABORTED_REJECTED is correct. There is no such logic in the fulfil handler. @@ -67,7 +67,7 @@ const processPositionFxFulfilBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( commitRequestId, counterPartyFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, diff --git a/src/domain/position/fx-prepare.js b/src/domain/position/fx-prepare.js index 6f3758e00..f35d0b876 100644 --- a/src/domain/position/fx-prepare.js +++ b/src/domain/position/fx-prepare.js @@ -65,7 +65,7 @@ const processFxPositionPrepareBin = async ( // set destination to initiatingFsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -88,7 +88,7 @@ const processFxPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( fxTransfer.commitRequestId, fxTransfer.initiatingFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, @@ -107,7 +107,7 @@ const processFxPositionPrepareBin = async ( // set destination to payerfsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -130,7 +130,7 @@ const processFxPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( fxTransfer.commitRequestId, fxTransfer.initiatingFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, @@ -149,7 +149,7 @@ const processFxPositionPrepareBin = async ( // set destination to payerfsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -172,7 +172,7 @@ const processFxPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( fxTransfer.commitRequestId, fxTransfer.initiatingFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, diff --git a/src/domain/position/prepare.js b/src/domain/position/prepare.js index 81c4573f8..3d23ce80a 100644 --- a/src/domain/position/prepare.js +++ b/src/domain/position/prepare.js @@ -67,7 +67,7 @@ const processPositionPrepareBin = async ( // set destination to payerfsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -90,7 +90,7 @@ const processPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( transfer.transferId, transfer.payerFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, @@ -109,7 +109,7 @@ const processPositionPrepareBin = async ( // set destination to payerfsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -132,7 +132,7 @@ const processPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( transfer.transferId, transfer.payerFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, @@ -151,7 +151,7 @@ const processPositionPrepareBin = async ( // set destination to payerfsp and source to switch const headers = { ...binItem.message.value.content.headers } headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp - headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value + headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME delete headers['content-length'] const fspiopError = ErrorHandler.Factory.createFSPIOPError( @@ -174,7 +174,7 @@ const processPositionPrepareBin = async ( resultMessage = Utility.StreamingProtocol.createMessage( transfer.transferId, transfer.payerFsp, - Enum.Http.Headers.FSPIOP.SWITCH.value, + Config.HUB_NAME, metadata, headers, fspiopError, diff --git a/src/handlers/bulk/fulfil/handler.js b/src/handlers/bulk/fulfil/handler.js index 1a94f3b45..2166fdaa8 100644 --- a/src/handlers/bulk/fulfil/handler.js +++ b/src/handlers/bulk/fulfil/handler.js @@ -110,7 +110,7 @@ const bulkFulfil = async (error, messages) => { Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST) const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -134,7 +134,7 @@ const bulkFulfil = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } try { @@ -240,7 +240,7 @@ const bulkFulfil = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorGeneric--${actionLetter}8`)) @@ -248,7 +248,7 @@ const bulkFulfil = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw validationFspiopError } } catch (err) { @@ -293,7 +293,7 @@ const sendIndividualTransfer = async (message, messageId, kafkaTopic, headers, p value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri, { id: transferId }) } params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) } diff --git a/src/handlers/bulk/get/handler.js b/src/handlers/bulk/get/handler.js index 571d55c36..9eb65d790 100644 --- a/src/handlers/bulk/get/handler.js +++ b/src/handlers/bulk/get/handler.js @@ -88,7 +88,7 @@ const getBulkTransfer = async (error, messages) => { if (!(await Validator.validateParticipantByName(message.value.from)).isValid) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `breakParticipantDoesntExist--${actionLetter}1`)) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } @@ -97,7 +97,7 @@ const getBulkTransfer = async (error, messages) => { if (!bulkTransferLight) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorBulkTransferNotFound--${actionLetter}3`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.BULK_TRANSFER_ID_NOT_FOUND, 'Provided Bulk Transfer ID was not found on the server.') - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } // The SD says this should be 404 response which I think will not be constent with single transfers @@ -106,7 +106,7 @@ const getBulkTransfer = async (error, messages) => { if (![participants.payeeFsp, participants.payerFsp].includes(message.value.from)) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotBulkTransferParticipant--${actionLetter}2`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } const isPayeeRequest = participants.payeeFsp === message.value.from @@ -129,9 +129,9 @@ const getBulkTransfer = async (error, messages) => { } message.value.content.payload = payload if (fspiopError) { - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) } else { - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) } histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true diff --git a/src/handlers/bulk/prepare/handler.js b/src/handlers/bulk/prepare/handler.js index 6dedb551e..5dc7656e0 100644 --- a/src/handlers/bulk/prepare/handler.js +++ b/src/handlers/bulk/prepare/handler.js @@ -145,15 +145,15 @@ const bulkPrepare = async (error, messages) => { params.message.value.content.payload = payload params.message.value.content.uriParams = { id: bulkTransferId } if (fspiopError) { - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) } else { - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) } return true } else { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, 'inProgress')) Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `ignore--${actionLetter}3`)) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME }) return true } } @@ -165,7 +165,7 @@ const bulkPrepare = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -183,7 +183,7 @@ const bulkPrepare = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } try { @@ -212,7 +212,7 @@ const bulkPrepare = async (error, messages) => { } params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer } const eventDetail = { functionality: Enum.Events.Event.Type.PREPARE, action: Enum.Events.Event.Action.BULK_PREPARE } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) } } catch (err) { // handle individual transfers streaming error @@ -221,7 +221,7 @@ const bulkPrepare = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } } else { // handle validation failure @@ -257,7 +257,7 @@ const bulkPrepare = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } // produce validation error callback notification to payer @@ -266,7 +266,7 @@ const bulkPrepare = async (error, messages) => { const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action } params.message.value.content.uriParams = { id: bulkTransferId } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw validationFspiopError } } catch (err) { diff --git a/src/handlers/bulk/processing/handler.js b/src/handlers/bulk/processing/handler.js index 1c2bf42dd..b89226bdb 100644 --- a/src/handlers/bulk/processing/handler.js +++ b/src/handlers/bulk/processing/handler.js @@ -32,7 +32,6 @@ const Logger = require('@mojaloop/central-services-logger') const BulkTransferService = require('../../../domain/bulkTransfer') const Util = require('@mojaloop/central-services-shared').Util -const Kafka = require('@mojaloop/central-services-shared').Util.Kafka const Producer = require('@mojaloop/central-services-stream').Util.Producer const Consumer = require('@mojaloop/central-services-stream').Util.Consumer const Enum = require('@mojaloop/central-services-shared').Enum @@ -41,6 +40,8 @@ const Config = require('../../../lib/config') const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload const BulkTransferModels = require('@mojaloop/object-store-lib').Models.BulkTransfer const ErrorHandler = require('@mojaloop/central-services-error-handling') +const Kafka = Util.Kafka +const HeaderValidation = Util.HeaderValidation const location = { module: 'BulkProcessingHandler', method: '', path: '' } // var object used as pointer @@ -295,7 +296,7 @@ const bulkProcessing = async (error, messages) => { }) const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, params.message.value.metadata.type, params.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS) params.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, payeeBulkResponse.destination, payeeBulkResponse.headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, payeeBulkResponse.headers, payload) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } else { @@ -310,7 +311,7 @@ const bulkProcessing = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `bulkFulfil--${actionLetter}3`)) const participants = await BulkTransferService.getParticipantsById(bulkTransferInfo.bulkTransferId) const normalizedKeys = Object.keys(headers).reduce((keys, k) => { keys[k.toLowerCase()] = k; return keys }, {}) - const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Enum.Http.Headers.FSPIOP.SWITCH.value, destinationFsp: participants.payeeFsp }) + const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Config.HUB_NAME, destinationFsp: participants.payeeFsp, hubNameRegex: HeaderValidation.getHubNameRegex(Config.HUB_NAME) }) delete payeeBulkResponseHeaders[normalizedKeys[Enum.Http.Headers.FSPIOP.SIGNATURE]] const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers: Util.clone(headers) }, getBulkTransferByIdResult.payerBulkTransfer) const payeeBulkResponse = Object.assign({}, { messageId: message.value.id, headers: payeeBulkResponseHeaders }, getBulkTransferByIdResult.payeeBulkTransfer) @@ -344,13 +345,13 @@ const bulkProcessing = async (error, messages) => { payerParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payerFsp, payerBulkResponse.headers[normalizedKeys[Enum.Http.Headers.FSPIOP.SOURCE]], payerMetadata, payerBulkResponse.headers, payerPayload) const payeeMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payeeParams.message.value.metadata.type, payeeParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS) - payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Enum.Http.Headers.FSPIOP.SWITCH.value, payeeMetadata, payeeBulkResponse.headers, payeePayload) + payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Config.HUB_NAME, payeeMetadata, payeeBulkResponse.headers, payeePayload) if ([Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) { eventDetail.action = Enum.Events.Event.Action.BULK_COMMIT } else if ([Enum.Events.Event.Action.BULK_ABORT].includes(action)) { eventDetail.action = Enum.Events.Event.Action.BULK_ABORT } - await Kafka.proceed(Config.KAFKA_CONFIG, payerParams, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, payerParams, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) await Kafka.proceed(Config.KAFKA_CONFIG, payeeParams, { consumerCommit, eventDetail }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) @@ -359,7 +360,7 @@ const bulkProcessing = async (error, messages) => { const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList) eventDetail.action = Enum.Events.Event.Action.BULK_ABORT params.message.value.content.uriParams.id = bulkTransferInfo.bulkTransferId - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, hubName: Config.HUB_NAME }) throw fspiopError } else { // TODO: For the following (Internal Server Error) scenario a notification is produced for each individual transfer. @@ -367,7 +368,7 @@ const bulkProcessing = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}4`)) const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`).toApiErrorObject(Config.ERROR_HANDLING) const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.BULK_PROCESSING } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } diff --git a/src/handlers/bulk/shared/validator.js b/src/handlers/bulk/shared/validator.js index a54b039ff..af1ea4e1c 100644 --- a/src/handlers/bulk/shared/validator.js +++ b/src/handlers/bulk/shared/validator.js @@ -95,7 +95,7 @@ const validateFspiopSourceAndDestination = async (payload, headers) => { // Due to the Bulk [Design Considerations](https://docs.mojaloop.io/technical/central-bulk-transfers/#_2-design-considerations), // it is possible that the Switch may send a POST Request to the Payee FSP with the Source Header containing "Switch", // and the Payee FSP thus responding with a PUT Callback and destination header containing the same value (Switch). - (headers[Enum.Http.Headers.FSPIOP.DESTINATION] === Enum.Http.Headers.FSPIOP.SWITCH.value) + (headers[Enum.Http.Headers.FSPIOP.DESTINATION] === Config.HUB_NAME) ) ) diff --git a/src/handlers/positions/handler.js b/src/handlers/positions/handler.js index d32f7e135..009823b0d 100644 --- a/src/handlers/positions/handler.js +++ b/src/handlers/positions/handler.js @@ -159,7 +159,7 @@ const positions = async (error, messages) => { const { transferState, fspiopError } = prepareMessage if (transferState.transferStateId === Enum.Transfers.TransferState.RESERVED) { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, `payer--${actionLetter}1`)) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action }) return true } else { @@ -167,7 +167,7 @@ const positions = async (error, messages) => { const responseFspiopError = fspiopError || ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR) const fspiopApiError = responseFspiopError.toApiErrorObject(Config.ERROR_HANDLING) await TransferService.logTransferError(transferId, fspiopApiError.errorInformation.errorCode, fspiopApiError.errorInformation.errorDescription) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopApiError, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopApiError, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw responseFspiopError } } @@ -177,7 +177,7 @@ const positions = async (error, messages) => { if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, `validationFailed::notReceivedFulfilState1--${actionLetter}3`)) const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid State: ${transferInfo.transferStateId} - expected: ${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL}`) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } else { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, `payee--${actionLetter}4`)) @@ -191,7 +191,7 @@ const positions = async (error, messages) => { const transfer = await TransferService.getById(transferInfo.transferId) message.value.content.payload = TransferObjectTransform.toFulfil(transfer) } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action }) return true } @@ -214,7 +214,7 @@ const positions = async (error, messages) => { reason: transferInfo.reason } await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId, action }) return true } else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) { @@ -240,7 +240,8 @@ const positions = async (error, messages) => { { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), - eventDetail + eventDetail, + hubName: Config.HUB_NAME }) throw fspiopError } @@ -248,7 +249,7 @@ const positions = async (error, messages) => { Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}8`)) const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`) const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.POSITION } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } } catch (err) { diff --git a/src/handlers/positions/handlerBatch.js b/src/handlers/positions/handlerBatch.js index 54249b0e9..9186efd8f 100644 --- a/src/handlers/positions/handlerBatch.js +++ b/src/handlers/positions/handlerBatch.js @@ -141,7 +141,7 @@ const positions = async (error, messages) => { for (const message of Object.values(lastPerPartition)) { const params = { message, kafkaTopic: message.topic, consumer: Consumer } // We are using Kafka.proceed() to just commit the offset of the last message in the array - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME }) } // Commit DB transaction diff --git a/src/handlers/timeouts/handler.js b/src/handlers/timeouts/handler.js index 4cf120955..88f6124ca 100644 --- a/src/handlers/timeouts/handler.js +++ b/src/handlers/timeouts/handler.js @@ -61,7 +61,7 @@ const _processTimedOutTransfers = async (transferTimeoutList) => { try { const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription) const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(transferTimeoutList[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state) - const headers = Utility.Http.SwitchDefaultHeaders(transferTimeoutList[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion) + const headers = Utility.Http.SwitchDefaultHeaders(transferTimeoutList[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion) const message = Utility.StreamingProtocol.createMessage(transferTimeoutList[i].transferId, transferTimeoutList[i].payeeFsp, transferTimeoutList[i].payerFsp, metadata, headers, fspiopError, { id: transferTimeoutList[i].transferId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.TRANSFERS].contentVersion}`) span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED)) await span.audit({ @@ -73,7 +73,7 @@ const _processTimedOutTransfers = async (transferTimeoutList) => { if (transferTimeoutList[i].bulkTransferId === null) { // regular transfer if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.to = message.from - message.from = Enum.Http.Headers.FSPIOP.SWITCH.value + message.from = Config.HUB_NAME // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span) } else if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) { @@ -95,7 +95,7 @@ const _processTimedOutTransfers = async (transferTimeoutList) => { } else { // individual transfer from a bulk if (transferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.to = message.from - message.from = Enum.Http.Headers.FSPIOP.SWITCH.value + message.from = Config.HUB_NAME message.metadata.event.type = Enum.Events.Event.Type.BULK_PROCESSING message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.BULK_PROCESSING, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, message, state, null, span) @@ -133,7 +133,7 @@ const _processFxTimedOutTransfers = async (fxTransferTimeoutList) => { try { const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription) const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(fxTransferTimeoutList[i].commitRequestId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state) - const headers = Utility.Http.SwitchDefaultHeaders(fxTransferTimeoutList[i].initiatingFsp, Enum.Http.HeaderResources.FX_TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value, resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion) + const headers = Utility.Http.SwitchDefaultHeaders(fxTransferTimeoutList[i].initiatingFsp, Enum.Http.HeaderResources.FX_TRANSFERS, Config.HUB_NAME, resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion) const message = Utility.StreamingProtocol.createMessage(fxTransferTimeoutList[i].commitRequestId, fxTransferTimeoutList[i].counterPartyFsp, fxTransferTimeoutList[i].initiatingFsp, metadata, headers, fspiopError, { id: fxTransferTimeoutList[i].commitRequestId }, `application/vnd.interoperability.${Enum.Http.HeaderResources.FX_TRANSFERS}+json;version=${resourceVersions[Enum.Http.HeaderResources.FX_TRANSFERS].contentVersion}`) span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.FX_TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED)) await span.audit({ @@ -144,7 +144,7 @@ const _processFxTimedOutTransfers = async (fxTransferTimeoutList) => { }, EventSdk.AuditEventAction.start) if (fxTransferTimeoutList[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) { message.to = message.from - message.from = Enum.Http.Headers.FSPIOP.SWITCH.value + message.from = Config.HUB_NAME // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED await Kafka.produceGeneralMessage( Config.KAFKA_CONFIG, Producer, diff --git a/src/handlers/transfers/FxFulfilService.js b/src/handlers/transfers/FxFulfilService.js index e69a5fb39..fa88634db 100644 --- a/src/handlers/transfers/FxFulfilService.js +++ b/src/handlers/transfers/FxFulfilService.js @@ -51,16 +51,25 @@ class FxFulfilService { } async getFxTransferDetails(commitRequestId, functionality) { - const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestId(commitRequestId) + const transfer = + await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestId( + commitRequestId + ) if (!transfer) { const fspiopError = fspiopErrorFactory.fxTransferNotFound() - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, action: Action.FX_RESERVE } - this.log.warn('fxTransfer not found', { commitRequestId, eventDetail, apiFSPIOPError }) + this.log.warn('fxTransfer not found', { + commitRequestId, + eventDetail, + apiFSPIOPError + }) await this.kafkaProceed({ consumerCommit, @@ -78,23 +87,39 @@ class FxFulfilService { async validateHeaders({ transfer, headers, payload }) { let fspiopError = null - if (headers[SOURCE]?.toLowerCase() !== transfer.counterPartyFspName.toLowerCase()) { + if ( + headers[SOURCE]?.toLowerCase() !== + transfer.counterPartyFspName.toLowerCase() + ) { fspiopError = fspiopErrorFactory.fxHeaderSourceValidationError() } - if (headers[DESTINATION]?.toLowerCase() !== transfer.initiatingFspName.toLowerCase()) { + if ( + headers[DESTINATION]?.toLowerCase() !== + transfer.initiatingFspName.toLowerCase() + ) { fspiopError = fspiopErrorFactory.fxHeaderDestinationValidationError() } if (fspiopError) { - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality: Type.POSITION, action: Action.FX_ABORT_VALIDATION } - this.log.warn('headers validation error', { eventDetail, apiFSPIOPError }) + this.log.warn('headers validation error', { + eventDetail, + apiFSPIOPError + }) // Lets handle the abort validation and change the fxTransfer state to reflect this - await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError) + await this.FxTransferModel.fxTransfer.saveFxFulfilResponse( + transfer.commitRequestId, + payload, + eventDetail.action, + apiFSPIOPError + ) // Publish message to FX Position Handler await this.kafkaProceed({ @@ -128,9 +153,19 @@ class FxFulfilService { ) } - async checkDuplication({ dupCheckResult, transfer, functionality, action, type }) { + async checkDuplication({ + dupCheckResult, + transfer, + functionality, + action, + type + }) { const transferStateEnum = transfer?.transferStateEnumeration - this.log.info('fxTransfer checkDuplication...', { dupCheckResult, action, transferStateEnum }) + this.log.info('fxTransfer checkDuplication...', { + dupCheckResult, + action, + transferStateEnum + }) if (!dupCheckResult.hasDuplicateId) { this.log.debug('No duplication found') @@ -140,12 +175,20 @@ class FxFulfilService { if (!dupCheckResult.hasDuplicateHash) { // ERROR: We've seen fxTransfer of this ID before, but it's message hash doesn't match the previous message hash. const fspiopError = fspiopErrorFactory.noFxDuplicateHash() - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, - action: action === Action.FX_ABORT ? Action.FX_ABORT_DUPLICATE : Action.FX_FULFIL_DUPLICATE + action: + action === Action.FX_ABORT + ? Action.FX_ABORT_DUPLICATE + : Action.FX_FULFIL_DUPLICATE } - this.log.warn('callbackErrorModified - no hasDuplicateHash', { eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorModified - no hasDuplicateHash', { + eventDetail, + apiFSPIOPError + }) await this.kafkaProceed({ consumerCommit, @@ -158,18 +201,30 @@ class FxFulfilService { // This is a duplicate message for a fxTransfer that is already in a finalized state // respond as if we received a GET /fxTransfers/{ID} from the client - if ([TransferState.COMMITTED, TransferState.ABORTED].includes(transferStateEnum)) { - this.params.message.value.content.payload = this.transform.toFulfil(transfer) + if ( + [TransferState.COMMITTED, TransferState.ABORTED].includes( + transferStateEnum + ) + ) { + this.params.message.value.content.payload = + this.transform.toFulfil(transfer) const eventDetail = { functionality, - action: action === Action.FX_ABORT ? Action.FX_ABORT_DUPLICATE : Action.FX_FULFIL_DUPLICATE + action: + action === Action.FX_ABORT + ? Action.FX_ABORT_DUPLICATE + : Action.FX_FULFIL_DUPLICATE } this.log.info('eventDetail:', { eventDetail }) await this.kafkaProceed({ consumerCommit, eventDetail, fromSwitch }) return true } - if ([TransferState.RECEIVED, TransferState.RESERVED].includes(transferStateEnum)) { + if ( + [TransferState.RECEIVED, TransferState.RESERVED].includes( + transferStateEnum + ) + ) { this.log.info('state: RECEIVED or RESERVED') await this.kafkaProceed({ consumerCommit }) // this code doesn't publish any message to kafka, coz we don't provide eventDetail: @@ -178,13 +233,22 @@ class FxFulfilService { } // Error scenario - fxTransfer.transferStateEnumeration is in some invalid state - const fspiopError = fspiopErrorFactory.invalidFxTransferState({ transferStateEnum, action, type }) - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const fspiopError = fspiopErrorFactory.invalidFxTransferState({ + transferStateEnum, + action, + type + }) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, action: Action.FX_RESERVE } - this.log.warn('callbackErrorInvalidTransferStateEnum', { eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorInvalidTransferStateEnum', { + eventDetail, + apiFSPIOPError + }) await this.kafkaProceed({ consumerCommit, fspiopError: apiFSPIOPError, @@ -198,12 +262,18 @@ class FxFulfilService { async validateEventType(type, functionality) { if (type !== Type.FULFIL) { const fspiopError = fspiopErrorFactory.invalidEventType(type) - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, action: Action.FX_RESERVE } - this.log.warn('callbackErrorInvalidEventType', { type, eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorInvalidEventType', { + type, + eventDetail, + apiFSPIOPError + }) await this.kafkaProceed({ consumerCommit, @@ -217,23 +287,39 @@ class FxFulfilService { } async validateFulfilment(transfer, payload) { - const isValid = this.validateFulfilCondition(payload.fulfilment, transfer.ilpCondition) + const isValid = this.validateFulfilCondition( + payload.fulfilment, + transfer.ilpCondition + ) if (!isValid) { const fspiopError = fspiopErrorFactory.fxInvalidFulfilment() - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality: Type.POSITION, action: Action.FX_ABORT_VALIDATION } - this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError, transfer, payload }) - await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError) + this.log.warn('callbackErrorInvalidFulfilment', { + eventDetail, + apiFSPIOPError, + transfer, + payload + }) + await this.FxTransferModel.fxTransfer.saveFxFulfilResponse( + transfer.commitRequestId, + payload, + eventDetail.action, + apiFSPIOPError + ) await this.kafkaProceed({ consumerCommit, fspiopError: apiFSPIOPError, eventDetail, - messageKey: transfer.counterPartyFspTargetParticipantCurrencyId.toString() + messageKey: + transfer.counterPartyFspTargetParticipantCurrencyId.toString() }) throw fspiopError } @@ -245,12 +331,18 @@ class FxFulfilService { async validateTransferState(transfer, functionality) { if (transfer.transferState !== TransferState.RESERVED) { const fspiopError = fspiopErrorFactory.fxTransferNonReservedState() - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, action: Action.FX_RESERVE } - this.log.warn('callbackErrorNonReservedState', { eventDetail, apiFSPIOPError, transfer }) + this.log.warn('callbackErrorNonReservedState', { + eventDetail, + apiFSPIOPError, + transfer + }) await this.kafkaProceed({ consumerCommit, @@ -265,14 +357,21 @@ class FxFulfilService { } async validateExpirationDate(transfer, functionality) { - if (transfer.expirationDate <= new Date(Util.Time.getUTCString(new Date()))) { + if ( + transfer.expirationDate <= new Date(Util.Time.getUTCString(new Date())) + ) { const fspiopError = fspiopErrorFactory.fxTransferExpired() - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality, action: Action.FX_RESERVE } - this.log.warn('callbackErrorTransferExpired', { eventDetail, apiFSPIOPError }) + this.log.warn('callbackErrorTransferExpired', { + eventDetail, + apiFSPIOPError + }) await this.kafkaProceed({ consumerCommit, @@ -285,20 +384,30 @@ class FxFulfilService { } async processFxAbortAction({ transfer, payload, action }) { - const fspiopError = fspiopErrorFactory.fromErrorInformation(payload.errorInformation) - const apiFSPIOPError = fspiopError.toApiErrorObject(this.Config.ERROR_HANDLING) + const fspiopError = fspiopErrorFactory.fromErrorInformation( + payload.errorInformation + ) + const apiFSPIOPError = fspiopError.toApiErrorObject( + this.Config.ERROR_HANDLING + ) const eventDetail = { functionality: Type.POSITION, action } this.log.warn('FX_ABORT case', { eventDetail, apiFSPIOPError }) - await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, action, apiFSPIOPError) + await this.FxTransferModel.fxTransfer.saveFxFulfilResponse( + transfer.commitRequestId, + payload, + action, + apiFSPIOPError + ) await this.kafkaProceed({ consumerCommit, fspiopError: apiFSPIOPError, eventDetail, - messageKey: transfer.counterPartyFspTargetParticipantCurrencyId.toString() + messageKey: + transfer.counterPartyFspTargetParticipantCurrencyId.toString() // todo: think if we need to use cyrilOutput to get counterPartyFspTargetParticipantCurrencyId? }) @@ -306,8 +415,15 @@ class FxFulfilService { } async processFxFulfil({ transfer, payload, action }) { - await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, action) - const cyrilOutput = await this.cyril.processFxFulfilMessage(transfer.commitRequestId, payload) + await this.FxTransferModel.fxTransfer.saveFxFulfilResponse( + transfer.commitRequestId, + payload, + action + ) + const cyrilOutput = await this.cyril.processFxFulfilMessage( + transfer.commitRequestId, + payload + ) const eventDetail = { functionality: Type.POSITION, action @@ -317,27 +433,37 @@ class FxFulfilService { await this.kafkaProceed({ consumerCommit, eventDetail, - messageKey: cyrilOutput.counterPartyFspSourceParticipantCurrencyId.toString(), - topicNameOverride: this.Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT + messageKey: + cyrilOutput.counterPartyFspSourceParticipantCurrencyId.toString(), + topicNameOverride: + this.Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT }) return true } async kafkaProceed(kafkaOpts) { - return this.Kafka.proceed( - this.Config.KAFKA_CONFIG, - this.params, - kafkaOpts - ) + return this.Kafka.proceed(this.Config.KAFKA_CONFIG, this.params, { + hubName: this.Config.HUB_NAME, + ...kafkaOpts + }) } validateFulfilCondition(fulfilment, condition) { try { - const isValid = fulfilment && this.Validator.validateFulfilCondition(fulfilment, condition) - this.log.debug('validateFulfilCondition result:', { isValid, fulfilment, condition }) + const isValid = + fulfilment && + this.Validator.validateFulfilCondition(fulfilment, condition) + this.log.debug('validateFulfilCondition result:', { + isValid, + fulfilment, + condition + }) return isValid } catch (err) { - this.log.warn(`validateFulfilCondition error: ${err?.message}`, { fulfilment, condition }) + this.log.warn(`validateFulfilCondition error: ${err?.message}`, { + fulfilment, + condition + }) return false } } @@ -346,7 +472,9 @@ class FxFulfilService { if (!message?.value) { throw TypeError('Invalid message format!') } - const payload = Util.StreamingProtocol.decodePayload(message.value.content.payload) + const payload = Util.StreamingProtocol.decodePayload( + message.value.content.payload + ) const { headers } = message.value.content const { type, action } = message.value.metadata.event const commitRequestId = message.value.content.uriParams.id diff --git a/src/handlers/transfers/handler.js b/src/handlers/transfers/handler.js index c44bac527..a31440e48 100644 --- a/src/handlers/transfers/handler.js +++ b/src/handlers/transfers/handler.js @@ -194,7 +194,7 @@ const processFulfilMessage = async (message, functionality, span) => { * HOWTO: The list of individual transfers being committed should contain * non-existing transferId */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError // Lets validate FSPIOP Source & Destination Headers @@ -244,7 +244,7 @@ const processFulfilMessage = async (message, functionality, span) => { // Publish message to Position Handler // Key position abort with payer account id const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: transfer.payerFsp, messageKey: payerAccount.participantCurrencyId.toString() }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, fromSwitch, toDestination: transfer.payerFsp, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) /** * Send patch notification callback to original payee fsp if they asked for a a patch response. @@ -274,7 +274,7 @@ const processFulfilMessage = async (message, functionality, span) => { } } message.value.content.payload = reservedAbortedPayload - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, fromSwitch: true, toDestination: transfer.payeeFsp }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail: reserveAbortedEventDetail, fromSwitch: true, toDestination: transfer.payeeFsp, hubName: Config.HUB_NAME }) } throw apiFSPIOPError @@ -316,7 +316,7 @@ const processFulfilMessage = async (message, functionality, span) => { eventDetail.action = TransferEventAction.ABORT_DUPLICATE } } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } @@ -330,7 +330,7 @@ const processFulfilMessage = async (message, functionality, span) => { * * TODO: find a way to trigger this code branch and handle it at BulkProcessingHandler (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } @@ -343,7 +343,7 @@ const processFulfilMessage = async (message, functionality, span) => { /** * HOWTO: Impossible to trigger for individual transfer in a bulk? (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } @@ -363,7 +363,7 @@ const processFulfilMessage = async (message, functionality, span) => { * but use different fulfilment value. */ const eventDetail = { functionality, action } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -376,7 +376,7 @@ const processFulfilMessage = async (message, functionality, span) => { /** * TODO: BulkProcessingHandler (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -395,7 +395,7 @@ const processFulfilMessage = async (message, functionality, span) => { /** * TODO: BulkProcessingHandler (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -411,7 +411,7 @@ const processFulfilMessage = async (message, functionality, span) => { */ // Key position validation abort with payer account id const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString() }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: apiFSPIOPError, eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) // emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE if (action === TransferEventAction.RESERVE) { @@ -441,7 +441,7 @@ const processFulfilMessage = async (message, functionality, span) => { } } message.value.content.payload = reservedAbortedPayload - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true, toDestination: transfer.payeeFsp }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true, toDestination: transfer.payeeFsp, hubName: Config.HUB_NAME }) } throw fspiopError } @@ -453,7 +453,7 @@ const processFulfilMessage = async (message, functionality, span) => { /** * TODO: BulkProcessingHandler (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) // emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE if (action === TransferEventAction.RESERVE) { @@ -469,7 +469,7 @@ const processFulfilMessage = async (message, functionality, span) => { transferState: TransferState.ABORTED } message.value.content.payload = reservedAbortedPayload - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true, toDestination: transfer.payeeFsp }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true, toDestination: transfer.payeeFsp, hubName: Config.HUB_NAME }) } throw fspiopError } @@ -481,7 +481,7 @@ const processFulfilMessage = async (message, functionality, span) => { /** * TODO: BulkProcessingHandler (not in scope of #967) */ - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) // emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE if (action === TransferEventAction.RESERVE) { @@ -497,7 +497,7 @@ const processFulfilMessage = async (message, functionality, span) => { transferState: TransferState.ABORTED } message.value.content.payload = reservedAbortedPayload - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch: true, hubName: Config.HUB_NAME }) } throw fspiopError } @@ -530,7 +530,7 @@ const processFulfilMessage = async (message, functionality, span) => { } if (cyrilResult.positionChanges.length > 0) { const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: participantCurrencyId.toString(), topicNameOverride }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: participantCurrencyId.toString(), topicNameOverride, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) } else { histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) @@ -539,7 +539,7 @@ const processFulfilMessage = async (message, functionality, span) => { } } else { const payeeAccount = await Participant.getAccountByNameAndCurrency(transfer.payeeFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: payeeAccount.participantCurrencyId.toString(), topicNameOverride }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: payeeAccount.participantCurrencyId.toString(), topicNameOverride, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) } return true @@ -573,14 +573,14 @@ const processFulfilMessage = async (message, functionality, span) => { const eventDetail = { functionality: TransferEventType.POSITION, action } // Key position abort with payer account id const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString() }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) throw fspiopError } await TransferService.handlePayeeResponse(transferId, payload, action, fspiopError.toApiErrorObject(Config.ERROR_HANDLING)) const eventDetail = { functionality: TransferEventType.POSITION, action } // Key position abort with payer account id const payerAccount = await Participant.getAccountByNameAndCurrency(transfer.payerFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString() }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, messageKey: payerAccount.participantCurrencyId.toString(), hubName: Config.HUB_NAME }) // TODO(2556): I don't think we should emit an extra notification here // this is the case where the Payee sent an ABORT, so we don't need to tell them to abort throw fspiopError @@ -713,7 +713,7 @@ const getTransfer = async (error, messages) => { Util.breadcrumb(location, { path: 'validationFailed' }) if (!await Validator.validateParticipantByName(message.value.from)) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `breakParticipantDoesntExist--${actionLetter}1`)) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } @@ -721,13 +721,13 @@ const getTransfer = async (error, messages) => { if (!transfer) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorTransferNotFound--${actionLetter}3`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_ID_NOT_FOUND, 'Provided Transfer ID was not found on the server.') - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } if (!await Validator.validateParticipantTransferId(message.value.from, transferId)) { Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotTransferParticipant--${actionLetter}2`)) const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME }) throw fspiopError } @@ -735,7 +735,7 @@ const getTransfer = async (error, messages) => { Util.breadcrumb(location, { path: 'validationPassed' }) Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackMessage--${actionLetter}4`)) message.value.content.payload = TransferObjectTransform.toFulfil(transfer) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId }) return true } catch (err) { diff --git a/src/handlers/transfers/prepare.js b/src/handlers/transfers/prepare.js index 3e5630e13..c662ec13a 100644 --- a/src/handlers/transfers/prepare.js +++ b/src/handlers/transfers/prepare.js @@ -88,7 +88,8 @@ const processDuplication = async ({ consumerCommit, fspiopError: error.toApiErrorObject(Config.ERROR_HANDLING), eventDetail: { functionality, action }, - fromSwitch + fromSwitch, + hubName: Config.HUB_NAME }) throw error } @@ -105,10 +106,10 @@ const processDuplication = async ({ params.message.value.content.payload = TransferObjectTransform.toFulfil(transfer, isFx) params.message.value.content.uriParams = { id: ID } const eventDetail = { functionality, action: Action.PREPARE_DUPLICATE } - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME }) } else { logger.info(Util.breadcrumb(location, `ignore--${actionLetter}3`)) - await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit }) + await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, hubName: Config.HUB_NAME }) } return true @@ -128,7 +129,8 @@ const savePreparedRequest = async ({ validationPassed, reasons, payload, isFx, f consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail: { functionality, action: Action.PREPARE }, - fromSwitch + fromSwitch, + hubName: Config.HUB_NAME }) throw fspiopError } @@ -174,7 +176,8 @@ const sendPositionPrepareMessage = async ({ isFx, payload, action, params }) => consumerCommit, eventDetail, messageKey, - topicNameOverride + topicNameOverride, + hubName: Config.HUB_NAME }) return true @@ -264,7 +267,8 @@ const prepare = async (error, messages) => { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail: { functionality, action }, - fromSwitch + fromSwitch, + hubName: Config.HUB_NAME }) throw fspiopError } diff --git a/test/fixtures.js b/test/fixtures.js index 409a4c613..b0bb5f910 100644 --- a/test/fixtures.js +++ b/test/fixtures.js @@ -24,6 +24,7 @@ const { randomUUID } = require('node:crypto') const { Enum } = require('@mojaloop/central-services-shared') +const Config = require('../src/lib/config') const ILP_PACKET = 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA' const CONDITION = '8x04dj-RKEtfjStajaKXKJ5eL1mWm9iG2ltEKvEDOHc' @@ -32,7 +33,7 @@ const FULFILMENT = 'uz0FAeutW6o8Mz7OmJh8ALX6mmsZCcIDOqtE01eo4uI' const DFSP1_ID = 'dfsp1' const DFSP2_ID = 'dfsp2' const FXP_ID = 'fxp' -const SWITCH_ID = 'switch' +const SWITCH_ID = Config.HUB_NAME const TOPICS = Object.freeze({ notificationEvent: 'topic-notification-event', diff --git a/test/unit/domain/position/fulfil.test.js b/test/unit/domain/position/fulfil.test.js index 226648dba..02f100492 100644 --- a/test/unit/domain/position/fulfil.test.js +++ b/test/unit/domain/position/fulfil.test.js @@ -29,6 +29,7 @@ const { Enum } = require('@mojaloop/central-services-shared') const Sinon = require('sinon') const { processPositionFulfilBin } = require('../../../../src/domain/position/fulfil') const { randomUUID } = require('crypto') +const Config = require('../../../../src/lib/config') const constructTransferCallbackTestData = (payerFsp, payeeFsp, transferState, eventAction, amount, currency) => { const transferId = randomUUID() @@ -429,13 +430,13 @@ Test('Fulfil domain', processPositionFulfilBinTest => { test.equal(result.notifyMessages[0].message.content.headers.accept, transferTestData1.message.value.content.headers.accept) test.equal(result.notifyMessages[0].message.content.headers['fspiop-destination'], transferTestData1.message.value.content.headers['fspiop-source']) - test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(result.notifyMessages[0].message.content.headers['content-type'], transferTestData1.message.value.content.headers['content-type']) test.equal(result.accumulatedTransferStates[transferTestData1.message.value.id], Enum.Transfers.TransferInternalState.INVALID) test.equal(result.notifyMessages[1].message.content.headers.accept, transferTestData2.message.value.content.headers.accept) test.equal(result.notifyMessages[1].message.content.headers['fspiop-destination'], transferTestData2.message.value.content.headers['fspiop-source']) - test.equal(result.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(result.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(result.notifyMessages[1].message.content.headers['content-type'], transferTestData2.message.value.content.headers['content-type']) test.equal(result.accumulatedTransferStates[transferTestData2.message.value.id], Enum.Transfers.TransferInternalState.INVALID) @@ -474,7 +475,7 @@ Test('Fulfil domain', processPositionFulfilBinTest => { test.equal(result.notifyMessages[0].message.content.headers.accept, transferTestData1.message.value.content.headers.accept) test.equal(result.notifyMessages[0].message.content.headers['fspiop-destination'], transferTestData1.message.value.content.headers['fspiop-source']) - test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(result.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(result.notifyMessages[0].message.content.headers['content-type'], transferTestData1.message.value.content.headers['content-type']) test.equal(result.accumulatedTransferStates[transferTestData1.message.value.id], Enum.Transfers.TransferInternalState.INVALID) diff --git a/test/unit/domain/position/fx-fulfil.test.js b/test/unit/domain/position/fx-fulfil.test.js index 76047ebbc..22a14c81f 100644 --- a/test/unit/domain/position/fx-fulfil.test.js +++ b/test/unit/domain/position/fx-fulfil.test.js @@ -29,6 +29,7 @@ const { Enum } = require('@mojaloop/central-services-shared') const Sinon = require('sinon') const { processPositionFxFulfilBin } = require('../../../../src/domain/position/fx-fulfil') const { randomUUID } = require('crypto') +const Config = require('../../../../src/lib/config') const constructFxTransferCallbackTestData = (initiatingFsp, counterPartyFsp) => { const commitRequestId = randomUUID() @@ -180,7 +181,7 @@ Test('Fx Fulfil domain', processPositionFxFulfilBinTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferCallbackTestData3.message.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferCallbackTestData3.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], fxTransferCallbackTestData3.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], fxTransferCallbackTestData3.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.accumulatedFxTransferStates[fxTransferCallbackTestData3.message.value.id], Enum.Transfers.TransferInternalState.ABORTED_REJECTED) diff --git a/test/unit/domain/position/fx-prepare.test.js b/test/unit/domain/position/fx-prepare.test.js index 653795a55..c9e6643de 100644 --- a/test/unit/domain/position/fx-prepare.test.js +++ b/test/unit/domain/position/fx-prepare.test.js @@ -30,6 +30,7 @@ const Sinon = require('sinon') const { processFxPositionPrepareBin } = require('../../../../src/domain/position/fx-prepare') const Logger = require('@mojaloop/central-services-logger') const { randomUUID } = require('crypto') +const Config = require('../../../../src/lib/config') const constructFxTransferTestData = (initiatingFsp, counterPartyFsp, sourceAmount, sourceCurrency, targetAmount, targetCurrency) => { const commitRequestId = randomUUID() @@ -211,7 +212,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferTestData3.message.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferTestData3.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], fxTransferTestData3.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], fxTransferTestData3.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -259,7 +260,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[0].message.content.uriParams.id, fxTransferTestData1.message.value.id) test.equal(processedMessages.notifyMessages[0].message.content.headers.accept, fxTransferTestData1.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], fxTransferTestData1.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], fxTransferTestData1.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorCode, '4001') @@ -269,7 +270,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[1].message.content.uriParams.id, fxTransferTestData2.message.value.id) test.equal(processedMessages.notifyMessages[1].message.content.headers.accept, fxTransferTestData2.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], fxTransferTestData2.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], fxTransferTestData2.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorCode, '4001') test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorDescription, 'Payer FSP insufficient liquidity') @@ -278,7 +279,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferTestData3.message.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferTestData3.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], fxTransferTestData3.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], fxTransferTestData3.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -326,7 +327,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[0].message.content.uriParams.id, fxTransferTestData1.message.value.id) test.equal(processedMessages.notifyMessages[0].message.content.headers.accept, fxTransferTestData1.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], fxTransferTestData1.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], fxTransferTestData1.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorCode, '4200') test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorDescription, 'Payer limit error') @@ -335,7 +336,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[1].message.content.uriParams.id, fxTransferTestData2.message.value.id) test.equal(processedMessages.notifyMessages[1].message.content.headers.accept, fxTransferTestData2.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], fxTransferTestData2.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], fxTransferTestData2.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorCode, '4200') test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorDescription, 'Payer limit error') @@ -344,7 +345,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferTestData3.message.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferTestData3.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], fxTransferTestData3.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], fxTransferTestData3.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -408,7 +409,7 @@ Test('FX Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, fxTransferTestData3.message.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, fxTransferTestData3.message.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], fxTransferTestData3.message.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], fxTransferTestData3.message.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') diff --git a/test/unit/domain/position/prepare.test.js b/test/unit/domain/position/prepare.test.js index 23e68304e..19e4d6101 100644 --- a/test/unit/domain/position/prepare.test.js +++ b/test/unit/domain/position/prepare.test.js @@ -29,6 +29,7 @@ const { Enum } = require('@mojaloop/central-services-shared') const Sinon = require('sinon') const { processPositionPrepareBin } = require('../../../../src/domain/position/prepare') const Logger = require('@mojaloop/central-services-logger') +const Config = require('../../../../src/lib/config') // Each transfer is for $2.00 USD const transferMessage1 = { @@ -367,7 +368,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -429,7 +430,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[0].message.content.uriParams.id, transferMessage1.value.id) test.equal(processedMessages.notifyMessages[0].message.content.headers.accept, transferMessage1.value.content.headers.accept) test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], transferMessage1.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], transferMessage1.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorCode, '4001') @@ -439,7 +440,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[1].message.content.uriParams.id, transferMessage2.value.id) test.equal(processedMessages.notifyMessages[1].message.content.headers.accept, transferMessage2.value.content.headers.accept) test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], transferMessage2.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], transferMessage2.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorCode, '4001') test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorDescription, 'Payer FSP insufficient liquidity') @@ -448,7 +449,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -510,7 +511,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[0].message.content.uriParams.id, transferMessage1.value.id) test.equal(processedMessages.notifyMessages[0].message.content.headers.accept, transferMessage1.value.content.headers.accept) test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-destination'], transferMessage1.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[0].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[0].message.content.headers['content-type'], transferMessage1.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorCode, '4200') test.equal(processedMessages.notifyMessages[0].message.content.payload.errorInformation.errorDescription, 'Payer limit error') @@ -519,7 +520,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[1].message.content.uriParams.id, transferMessage2.value.id) test.equal(processedMessages.notifyMessages[1].message.content.headers.accept, transferMessage2.value.content.headers.accept) test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-destination'], transferMessage2.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[1].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[1].message.content.headers['content-type'], transferMessage2.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorCode, '4200') test.equal(processedMessages.notifyMessages[1].message.content.payload.errorInformation.errorDescription, 'Payer limit error') @@ -528,7 +529,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -606,7 +607,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -691,7 +692,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error') @@ -767,7 +768,7 @@ Test('Prepare domain', positionIndexTest => { test.equal(processedMessages.notifyMessages[2].message.content.uriParams.id, transferMessage3.value.id) test.equal(processedMessages.notifyMessages[2].message.content.headers.accept, transferMessage3.value.content.headers.accept) test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-destination'], transferMessage3.value.content.headers['fspiop-source']) - test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Enum.Http.Headers.FSPIOP.SWITCH.value) + test.equal(processedMessages.notifyMessages[2].message.content.headers['fspiop-source'], Config.HUB_NAME) test.equal(processedMessages.notifyMessages[2].message.content.headers['content-type'], transferMessage3.value.content.headers['content-type']) test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorCode, '2001') test.equal(processedMessages.notifyMessages[2].message.content.payload.errorInformation.errorDescription, 'Internal server error')