Skip to content

Commit

Permalink
fix: fx fulfil header validation (#1084)
Browse files Browse the repository at this point in the history
* fix: fx fulfil

* chore(snapshot): 17.8.0-snapshot.10

* chore(snapshot): 17.8.0-snapshot.11

* chore(snapshot): 17.8.0-snapshot.12

* fix: fx fulfil proxy
  • Loading branch information
vijayg10 authored Aug 23, 2024
1 parent 6253688 commit 0f5192c
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 63 deletions.
1 change: 0 additions & 1 deletion .ncurc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ reject: [
"sinon",
# glob >= 11 requires node >= 20
"glob",
"@mojaloop/central-services-shared"
]
1 change: 1 addition & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
"COMMIT": null,
"BULK_COMMIT": null,
"RESERVE": null,
"FX_RESERVE": "topic-transfer-position-batch",
"TIMEOUT_RESERVED": null,
"FX_TIMEOUT_RESERVED": "topic-transfer-position-batch",
"ABORT": null,
Expand Down
70 changes: 34 additions & 36 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.8.0-snapshot.10",
"version": "17.8.0-snapshot.12",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -92,7 +92,7 @@
"@mojaloop/central-services-health": "15.0.0",
"@mojaloop/central-services-logger": "11.5.1",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.6.3",
"@mojaloop/central-services-shared": "18.7.2",
"@mojaloop/central-services-stream": "11.3.1",
"@mojaloop/database-lib": "11.0.6",
"@mojaloop/event-sdk": "14.1.1",
Expand Down
14 changes: 9 additions & 5 deletions src/domain/position/abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ const processPositionAbortBin = async (
for (const positionChange of cyrilResult.positionChanges) {
if (positionChange.isFxTransferStateChange) {
// Construct notification message for fx transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo, Enum.Events.Event.Action.FX_ABORT)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.commitRequestId, Config.HUB_NAME, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
} else {
// Construct notification message for transfer state change
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo, Enum.Events.Event.Action.ABORT)
const resultMessage = _constructAbortResultMessage(binItem, positionChange.transferId, Config.HUB_NAME, positionChange.notifyTo)
resultMessages.push({ binItem, message: resultMessage })
}
}
Expand Down Expand Up @@ -125,9 +125,13 @@ const processPositionAbortBin = async (
}
}

const _constructAbortResultMessage = (binItem, id, from, notifyTo, action) => {
const _constructAbortResultMessage = (binItem, id, from, notifyTo) => {
let apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION
if (binItem.message?.value.metadata.event.action === Enum.Events.Event.Action.FX_ABORT_VALIDATION) {
apiErrorCode = ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR
}
const fspiopError = ErrorHandler.Factory.createFSPIOPError(
ErrorHandler.Enums.FSPIOPErrorCodes.PAYEE_REJECTION, // TODO: Need clarification on this
apiErrorCode,
null,
null,
null,
Expand All @@ -144,7 +148,7 @@ const _constructAbortResultMessage = (binItem, id, from, notifyTo, action) => {
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
id,
Enum.Kafka.Topics.POSITION,
action,
binItem.message?.value.metadata.event.action, // This will be replaced anyway in Kafka.produceGeneralMessage function
state
)
const resultMessage = Utility.StreamingProtocol.createMessage(
Expand Down
34 changes: 22 additions & 12 deletions src/handlers/transfers/FxFulfilService.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FxFulfilService {
}

async getFxTransferDetails(commitRequestId, functionality) {
const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestId(commitRequestId)
const transfer = await this.FxTransferModel.fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(commitRequestId)

if (!transfer) {
const fspiopError = fspiopErrorFactory.fxTransferNotFound()
Expand All @@ -79,10 +79,10 @@ class FxFulfilService {
async validateHeaders({ transfer, headers, payload }) {
let fspiopError = null

if (headers[SOURCE]?.toLowerCase() !== transfer.counterPartyFspName.toLowerCase()) {
if (!transfer.counterPartyFspIsProxy && (headers[SOURCE]?.toLowerCase() !== transfer.counterPartyFspName.toLowerCase())) {
fspiopError = fspiopErrorFactory.fxHeaderSourceValidationError()
}
if (headers[DESTINATION]?.toLowerCase() !== transfer.initiatingFspName.toLowerCase()) {
if (!transfer.initiatingFspIsProxy && (headers[DESTINATION]?.toLowerCase() !== transfer.initiatingFspName.toLowerCase())) {
fspiopError = fspiopErrorFactory.fxHeaderDestinationValidationError()
}

Expand All @@ -97,16 +97,31 @@ class FxFulfilService {
// Lets handle the abort validation and change the fxTransfer state to reflect this
await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError)

// Publish message to FX Position Handler
await this._handleAbortValidation(transfer, apiFSPIOPError, eventDetail)
throw fspiopError
}
}

async _handleAbortValidation(transfer, apiFSPIOPError, eventDetail) {
const cyrilResult = await this.cyril.processFxAbortMessage(transfer.commitRequestId)

this.params.message.value.content.context = {
...this.params.message.value.content.context,
cyrilResult
}
if (cyrilResult.positionChanges.length > 0) {
const participantCurrencyId = cyrilResult.positionChanges[0].participantCurrencyId
await this.kafkaProceed({
consumerCommit,
fspiopError: apiFSPIOPError,
eventDetail,
fromSwitch,
toDestination: transfer.initiatingFspName,
// The message key doesn't matter here, as there are no position changes for FX Fulfil
messageKey: transfer.counterPartyFspSourceParticipantCurrencyId.toString()
messageKey: participantCurrencyId.toString(),
topicNameOverride: this.Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.FX_ABORT
})
} else {
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError('Invalid cyril result')
throw fspiopError
}
}
Expand Down Expand Up @@ -231,12 +246,7 @@ class FxFulfilService {
this.log.warn('callbackErrorInvalidFulfilment', { eventDetail, apiFSPIOPError, transfer, payload })
await this.FxTransferModel.fxTransfer.saveFxFulfilResponse(transfer.commitRequestId, payload, eventDetail.action, apiFSPIOPError)

await this.kafkaProceed({
consumerCommit,
fspiopError: apiFSPIOPError,
eventDetail,
messageKey: transfer.counterPartyFspTargetParticipantCurrencyId.toString()
})
await this._handleAbortValidation(transfer, apiFSPIOPError, eventDetail)
throw fspiopError
}

Expand Down
2 changes: 2 additions & 0 deletions src/models/fxTransfer/fxTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ const getAllDetailsByCommitRequestId = async (commitRequestId) => {
'fxTransfer.*',
'da.participantId AS initiatingFspParticipantId',
'da.name AS initiatingFspName',
'da.isProxy AS initiatingFspIsProxy',
// 'pc21.participantCurrencyId AS counterPartyFspSourceParticipantCurrencyId',
// 'pc22.participantCurrencyId AS counterPartyFspTargetParticipantCurrencyId',
'tp21.participantCurrencyId AS counterPartyFspSourceParticipantCurrencyId',
'tp22.participantCurrencyId AS counterPartyFspTargetParticipantCurrencyId',
'ca.participantId AS counterPartyFspParticipantId',
'ca.name AS counterPartyFspName',
'ca.isProxy AS counterPartyFspIsProxy',
'tsc.fxTransferStateChangeId',
'tsc.transferStateId AS transferState',
'tsc.reason AS reason',
Expand Down
Loading

0 comments on commit 0f5192c

Please sign in to comment.