Skip to content

Commit

Permalink
feat: fulfil obligation tracking (#1063)
Browse files Browse the repository at this point in the history
* feat(csi-22): add proxy lib to handlers

* diff

* add

* int tests

* fix hanging int tests

* fixes?

* unit fixes?

* coverage

* feat: add zero adjustment for prepare position batch

* feat: refactor proxy cache integration

* feat: restore default

* feat: minor optimization

* test: update coverage

* test: remove try-catch

* fix: fix disconnect error

* feat(prepare-position): add proxy substitution and zero adjustment logic

* fix: remove uneeded async

* feat: proxy cache update (#1061)

* addressed comments

* chore: refactor

* test: add unit tests

* chore: minor refactor

* chore: lint

* feat: revert prepare hadnler change, update test coverage

* feat: update docker compose and default config for docker

* chore: remove commented code

* test: update test

* test: update test

* feat: added proxy check in fulfil handler

* fix: derive fn

* fix: checkSameCreditorDebtorProxy

* unit tests

* unit tests

* int tests

* fix: unit tests

* chore: added unit tests for proxyCache deriveCurrencyId function

* chore: added coverage

* stuff

* some int tests

* comments

* pass object

* messy but working

* coverage

* hanging int test?

* fix int tests

* feat: refactor

* clarify naming

* comment

* feat: added more test coverage

* fixes?

* dep update

* fix: int tests

* fix: disable tests around fspiop header validation in fulfil

* fix: int tests

* chore: disabled a fulfil test due to and issue in position handler

* fix: int tests

* chore: addressed pr comment

* fix: lint

* fix: integration tests

---------

Co-authored-by: Kevin Leyow <[email protected]>
Co-authored-by: Steven Oderayi <[email protected]>
  • Loading branch information
3 people authored Aug 6, 2024
1 parent a92187f commit 2d7abfe
Show file tree
Hide file tree
Showing 13 changed files with 961 additions and 121 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ If you want to run integration tests in a repetitive manner, you can startup the
Start containers required for Integration Tests

```bash
docker-compose -f docker-compose.yml up -d mysql kafka init-kafka kafka-debug-console
docker-compose -f docker-compose.yml up -d mysql kafka init-kafka kafka-debug-console redis
```

Run wait script which will report once all required containers are up and running
Expand Down Expand Up @@ -242,7 +242,7 @@ If you want to run override position topic tests you can repeat the above and us
#### For running integration tests for batch processing interactively
- Run dependecies
```
docker-compose up -d mysql kafka init-kafka kafka-debug-console
docker-compose up -d mysql kafka init-kafka kafka-debug-console redis
npm run wait-4-docker
```
- Run central-ledger services
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
"jsdoc": "4.0.3",
"jsonpath": "1.1.1",
"nodemon": "3.1.4",
"npm-check-updates": "17.0.0",
"npm-check-updates": "17.0.3",
"nyc": "17.0.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
Expand Down
113 changes: 72 additions & 41 deletions src/domain/fx/cyril.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
const Metrics = require('@mojaloop/central-services-metrics')
const { Enum } = require('@mojaloop/central-services-shared')
const TransferModel = require('../../models/transfer/transfer')
const ParticipantFacade = require('../../models/participant/facade')
const { fxTransfer, watchList } = require('../../models/fxTransfer')
const Config = require('../../lib/config')
const ProxyCache = require('../../lib/proxyCache')

const checkIfDeterminingTransferExistsForTransferMessage = async (payload) => {
// Does this determining transfer ID appear on the watch list?
Expand Down Expand Up @@ -242,17 +242,15 @@ const processFulfilMessage = async (transferId, payload, transfer) => {
// Create obligation between FXP and FX requesting party in currency of reservation
// Find out the participantCurrencyId of the initiatingFsp
// The following is hardcoded for Payer side conversion with SEND amountType.
const participantCurrency = await ParticipantFacade.getByNameAndCurrency(
fxTransferRecord.initiatingFspName,
fxTransferRecord.targetCurrency,
Enum.Accounts.LedgerAccountType.POSITION
)
result.positionChanges.push({
isFxTransferStateChange: false,
transferId,
participantCurrencyId: participantCurrency.participantCurrencyId,
amount: -fxTransferRecord.targetAmount
})
const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(fxTransferRecord.initiatingFspName, fxTransferRecord.targetCurrency)
if (proxyParticipantAccountDetails.participantCurrencyId) {
result.positionChanges.push({
isFxTransferStateChange: false,
transferId,
participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId,
amount: -fxTransferRecord.targetAmount
})
}
// TODO: Send PATCH notification to FXP
}

Expand All @@ -262,12 +260,15 @@ const processFulfilMessage = async (transferId, payload, transfer) => {
sendingFxpExists = true
sendingFxpRecord = fxTransferRecord
// Create obligation between FX requesting party and FXP in currency of reservation
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: fxTransferRecord.commitRequestId,
participantCurrencyId: fxTransferRecord.counterPartyFspSourceParticipantCurrencyId,
amount: -fxTransferRecord.sourceAmount
})
const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(fxTransferRecord.counterPartyFspName, fxTransferRecord.sourceCurrency)
if (proxyParticipantAccountDetails.participantCurrencyId) {
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: fxTransferRecord.commitRequestId,
participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId,
amount: -fxTransferRecord.sourceAmount
})
}
// TODO: Send PATCH notification to FXP
}
}
Expand All @@ -279,34 +280,64 @@ const processFulfilMessage = async (transferId, payload, transfer) => {

if (sendingFxpExists && receivingFxpExists) {
// If we have both a sending and a receiving FXP, Create obligation between sending and receiving FXP in currency of transfer.
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: receivingFxpRecord.commitRequestId,
participantCurrencyId: receivingFxpRecord.counterPartyFspSourceParticipantCurrencyId,
amount: -receivingFxpRecord.sourceAmount
})
const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(receivingFxpRecord.counterPartyFspName, receivingFxpRecord.sourceCurrency)
if (proxyParticipantAccountDetails.participantCurrencyId) {
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: receivingFxpRecord.commitRequestId,
participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId,
amount: -receivingFxpRecord.sourceAmount
})
}
} else if (sendingFxpExists) {
// If we have a sending FXP, Create obligation between FXP and creditor party to the transfer in currency of FX transfer
// Get participantCurrencyId for transfer.payeeParticipantId/transfer.payeeFsp and sendingFxpRecord.targetCurrency
const participantCurrency = await ParticipantFacade.getByNameAndCurrency(
transfer.payeeFsp,
sendingFxpRecord.targetCurrency,
Enum.Accounts.LedgerAccountType.POSITION
)
result.positionChanges.push({
isFxTransferStateChange: false,
transferId,
participantCurrencyId: participantCurrency.participantCurrencyId,
amount: -sendingFxpRecord.targetAmount
})
const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(transfer.payeeFsp, sendingFxpRecord.targetCurrency)
if (proxyParticipantAccountDetails.participantCurrencyId) {
let isPositionChange = false
if (proxyParticipantAccountDetails.inScheme) {
isPositionChange = true
} else {
// We are not expecting this. Payee participant is a proxy and have an account in the targetCurrency.
// In this case we need to check if FXP is also a proxy and have the same account as payee.
const proxyParticipantAccountDetails2 = await ProxyCache.getProxyParticipantAccountDetails(sendingFxpRecord.counterPartyFspName, sendingFxpRecord.targetCurrency)
if (!proxyParticipantAccountDetails2.inScheme && (proxyParticipantAccountDetails.participantCurrencyId !== proxyParticipantAccountDetails2.participantCurrencyId)) {
isPositionChange = true
}
}
if (isPositionChange) {
result.positionChanges.push({
isFxTransferStateChange: false,
transferId,
participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId,
amount: -sendingFxpRecord.targetAmount
})
}
}
} else if (receivingFxpExists) {
// If we have a receiving FXP, Create obligation between debtor party to the transfer and FXP in currency of transfer
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: receivingFxpRecord.commitRequestId,
participantCurrencyId: receivingFxpRecord.counterPartyFspSourceParticipantCurrencyId,
amount: -receivingFxpRecord.sourceAmount
})
const proxyParticipantAccountDetails = await ProxyCache.getProxyParticipantAccountDetails(receivingFxpRecord.counterPartyFspName, receivingFxpRecord.sourceCurrency)
if (proxyParticipantAccountDetails.participantCurrencyId) {
let isPositionChange = false
if (proxyParticipantAccountDetails.inScheme) {
isPositionChange = true
} else {
// We are not expecting this. FXP participant is a proxy and have an account in the sourceCurrency.
// In this case we need to check if Payer is also a proxy and have the same account as FXP.
const proxyParticipantAccountDetails2 = await ProxyCache.getProxyParticipantAccountDetails(transfer.payerFsp, receivingFxpRecord.sourceCurrency)
if (!proxyParticipantAccountDetails2.inScheme && (proxyParticipantAccountDetails.participantCurrencyId !== proxyParticipantAccountDetails2.participantCurrencyId)) {
isPositionChange = true
}
}
if (isPositionChange) {
result.positionChanges.push({
isFxTransferStateChange: true,
commitRequestId: receivingFxpRecord.commitRequestId,
participantCurrencyId: proxyParticipantAccountDetails.participantCurrencyId,
amount: -receivingFxpRecord.sourceAmount
})
}
}
}

// TODO: Remove entries from watchlist
Expand Down
8 changes: 4 additions & 4 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const participantFacade = require('../../models/participant/facade')
* @returns {results} - Returns a list of bins with results or throws an error if failed
*/
const processBins = async (bins, trx) => {
let notifyMessages = []
let followupMessages = []
let limitAlarms = []

// Get transferIdList, reservedActionTransferIdList and commitRequestId for actions PREPARE, FX_PREPARE, FX_RESERVE, COMMIT and RESERVE
const { transferIdList, reservedActionTransferIdList, commitRequestIdList } = await _getTransferIdList(bins)

Expand Down Expand Up @@ -104,10 +108,6 @@ const processBins = async (bins, trx) => {
reservedActionTransferIdList
)

let notifyMessages = []
let followupMessages = []
let limitAlarms = []

// For each account-bin in the list
for (const accountID in bins) {
const accountBin = bins[accountID]
Expand Down
Loading

0 comments on commit 2d7abfe

Please sign in to comment.