Skip to content

Commit

Permalink
feat: parameterize switch id
Browse files Browse the repository at this point in the history
  • Loading branch information
oderayi committed Jun 24, 2024
1 parent ca5dca5 commit 11e68c6
Show file tree
Hide file tree
Showing 23 changed files with 327 additions and 179 deletions.
5 changes: 4 additions & 1 deletion docker/ml-api-adapter/default.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"HUB_PARTICIPANT": {
"ID": 1,
"NAME": "Hub"
},
"PORT": 3000,
"HOSTNAME": "http://ml-api-adapter",
"ENDPOINT_SOURCE_URL": "http://host.docker.internal:3001",
Expand All @@ -13,7 +17,6 @@
},
"JWS": {
"JWS_SIGN": false,
"FSPIOP_SOURCE_TO_SIGN": "switch",
"JWS_SIGNING_KEY_PATH": "secrets/jwsSigningKey.key"
}
},
Expand Down
42 changes: 24 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.3.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.4.0-snapshot.17",
"@mojaloop/central-services-shared": "18.5.0-snapshot.2",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.5",
"@mojaloop/event-sdk": "14.1.1",
Expand All @@ -118,7 +118,7 @@
"docdash": "2.0.2",
"event-stream": "4.0.1",
"five-bells-condition": "5.0.1",
"glob": "10.4.1",
"glob": "10.4.2",
"hapi-auth-basic": "5.0.0",
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.1",
Expand All @@ -139,7 +139,7 @@
"get-port": "5.1.1",
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.3",
"nodemon": "3.1.4",
"npm-check-updates": "16.14.20",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
Expand Down
4 changes: 2 additions & 2 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate
// set destination to payeefsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
Expand All @@ -151,7 +151,7 @@ const _handleIncorrectTransferState = (binItem, payeeFsp, transferId, accumulate
return Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
4 changes: 2 additions & 2 deletions src/domain/position/fx-fulfil.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const processPositionFxFulfilBin = async (
// set destination to counterPartyFsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = counterPartyFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

// TODO: Confirm if this setting transferStateId to ABORTED_REJECTED is correct. There is no such logic in the fulfil handler.
Expand All @@ -67,7 +67,7 @@ const processPositionFxFulfilBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
commitRequestId,
counterPartyFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
12 changes: 6 additions & 6 deletions src/domain/position/fx-prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const processFxPositionPrepareBin = async (
// set destination to initiatingFsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -88,7 +88,7 @@ const processFxPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
fxTransfer.commitRequestId,
fxTransfer.initiatingFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -107,7 +107,7 @@ const processFxPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -130,7 +130,7 @@ const processFxPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
fxTransfer.commitRequestId,
fxTransfer.initiatingFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -149,7 +149,7 @@ const processFxPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = fxTransfer.initiatingFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -172,7 +172,7 @@ const processFxPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
fxTransfer.commitRequestId,
fxTransfer.initiatingFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
12 changes: 6 additions & 6 deletions src/domain/position/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -90,7 +90,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -109,7 +109,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -132,7 +132,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand All @@ -151,7 +151,7 @@ const processPositionPrepareBin = async (
// set destination to payerfsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payerFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Config.HUB_NAME
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createFSPIOPError(
Expand All @@ -174,7 +174,7 @@ const processPositionPrepareBin = async (
resultMessage = Utility.StreamingProtocol.createMessage(
transfer.transferId,
transfer.payerFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
Config.HUB_NAME,
metadata,
headers,
fspiopError,
Expand Down
10 changes: 5 additions & 5 deletions src/handlers/bulk/fulfil/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ const bulkFulfil = async (error, messages) => {
Logger.isErrorEnabled && Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}

Expand All @@ -134,7 +134,7 @@ const bulkFulfil = async (error, messages) => {
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
params.message.value.content.uriParams = { id: bulkTransferId }

await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
try {
Expand Down Expand Up @@ -240,15 +240,15 @@ const bulkFulfil = async (error, messages) => {
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
params.message.value.content.uriParams = { id: bulkTransferId }

await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorGeneric--${actionLetter}8`))

const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
params.message.value.content.uriParams = { id: bulkTransferId }

await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: validationFspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw validationFspiopError
}
} catch (err) {
Expand Down Expand Up @@ -293,7 +293,7 @@ const sendIndividualTransfer = async (message, messageId, kafkaTopic, headers, p
value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri, { id: transferId })
}
params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail, hubName: Config.HUB_NAME })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
}

Expand Down
10 changes: 5 additions & 5 deletions src/handlers/bulk/get/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const getBulkTransfer = async (error, messages) => {

if (!(await Validator.validateParticipantByName(message.value.from)).isValid) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `breakParticipantDoesntExist--${actionLetter}1`))
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, hubName: Config.HUB_NAME })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
}
Expand All @@ -97,7 +97,7 @@ const getBulkTransfer = async (error, messages) => {
if (!bulkTransferLight) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorBulkTransferNotFound--${actionLetter}3`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.BULK_TRANSFER_ID_NOT_FOUND, 'Provided Bulk Transfer ID was not found on the server.')
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
// The SD says this should be 404 response which I think will not be constent with single transfers
Expand All @@ -106,7 +106,7 @@ const getBulkTransfer = async (error, messages) => {
if (![participants.payeeFsp, participants.payerFsp].includes(message.value.from)) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackErrorNotBulkTransferParticipant--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.CLIENT_ERROR)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
throw fspiopError
}
const isPayeeRequest = participants.payeeFsp === message.value.from
Expand All @@ -129,9 +129,9 @@ const getBulkTransfer = async (error, messages) => {
}
message.value.content.payload = payload
if (fspiopError) {
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(ERROR_HANDLING), eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(ERROR_HANDLING), eventDetail, fromSwitch, hubName: Config.HUB_NAME })
} else {
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch, hubName: Config.HUB_NAME })
}
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
Expand Down
Loading

0 comments on commit 11e68c6

Please sign in to comment.