Skip to content

Commit

Permalink
feat(mojaloop/#3524): add position fulfil to binprocessor (#990)
Browse files Browse the repository at this point in the history
* feat: added skeleton and comments for prepare bin implementation

* chore: some changes

* feat: added new position handler v2

* feat: added some functionality to new position handler v2

* feat: added some todos

* feat: changed new handler name

* fix: lint

* feat: add some changes

* fix: lint

* chore: added copyright header

* feat: added some logic

* fix: lint

* feat: added unit tests

* feat: some improvements

* feat: added bin processor logic

* fix: lint

* feat; refactor

* feat: added limit

* fix: unit tests

* feat: added integration test

* feat: refactor

* fix: unit tests

* fix: integration tests

* feat: add position prepare bin logic (#970)

* feat: add position prepare bin logic

* edit

* lint

* dep

* chore: changes

* chore: commet

* lint

* chore: refactor

* chore: unit tests

* chore: change

* chore: comment

* chore: test

* chore: more test

* chore: test?

* chore: remove

* chore: rework domain logic

* chore: test

* fix: position calculation

* fix: position change data

* fix; prepare domain function

* chore: fix

---------

Co-authored-by: Vijay <[email protected]>

* chore: add tests (#972)

* chore: add tests

* chore: unit

* sleep

* chore: clear

* chore: test

* enum

* fix: deadlock in tests

---------

Co-authored-by: Vijay <[email protected]>

* fix: metrics

* fix: some issue with bin processor

* feat(mojaloop/#3489): add cached calls for participant currency, update tests (#973)

* chore: add tests

* chore: unit

* sleep

* chore: clear

* chore: test

* enum

* feat: add cached calls for participant currency

* chore: change

* chore: unit

* chore: unit

* lock

* test

* fix: unit test

---------

Co-authored-by: Vijay <[email protected]>

* chore: improve coverage and edit metric and query name (#974)

* chore: coverage

* lint

* chore: coverage

* more coverage

* chore: test

* chore: coverage

* chore: metric name

* chore: metric name

* chore: update query name

* chore: remove log

* chore(snapshot): 17.3.0-snapshot.0

* fix: test downgrade stream lib

* chore(snapshot): 17.3.0-snapshot.1

* fix: notification event action

* chore(snapshot): 17.3.0-snapshot.2

* fix: notification event

* chore(snapshot): 17.3.0-snapshot.3

* fix: notification messages

* chore(snapshot): 17.3.0-snapshot.4

* chore: updated conf

* chore: update deps

* chore(snapshot): 17.3.0-snapshot.5

* fix(mojaloop/#3529): fix high latency  (#981)

* fix: upgrade central-services-stream library

* chore(snapshot): v17.2.1-snapshot.0

* chore(snapshot): 17.2.1-snapshot.1

* chore(snapshot): 17.2.1-snapshot.2

* chore(release): 17.2.1 [skip ci]

* feat(mojaloop/#3489): add negative integration tests (#975)

* chore: add support for unique transfers and dual currencies in batch

* check in for a negative test case

* uncommented tests

* added test for mixed currencies

* updated test for mixed currencies

* chore: dep and lint

---------

Co-authored-by: Kevin Leyow <[email protected]>

* Merge branch main of https://github.com/mojaloop/central-ledger into feature/position-prepare-binning

* chore: foreach to for loop

* chore: cleanup

* Update src/handlers/index.js

Co-authored-by: Miguel de Barros <[email protected]>

* chore: added readme doc

* fix: lint

* fix: unit tests

* fix: audit

* chore: cleanup

* fix(mojaloop/#3533): helm v15.2.0-rc fixes (#982)

fix(mojaloop/#3533): helm v15.2.0-rc fixes - mojaloop/project#3533

List of fixes:
- fix(mojaloop/#3580): missing toDestination on handling the fspiop source/destiation headers failing match validation on fulfil - regression on #2697 - v17.0.0...v17.2.0#diff-3a2d4aabbde0cd9517dd372f6ae6001ad607d005b5316785c8698fe25160aa92L393 - mojaloop/project#3580
    Fixes currently resolve regression failures on these tests:
        - p2p_money_transfer_put_notifications - payee receives no Notification with ABORTED status after sending invalid FSPIOP-Destination header with transferStatus=COMMITTED, file path: golden_path/bug fixes /Test for Bugfix #2697 - Central-Ledger Fulfil Handler does not correctly invalidate requests with an incorrect-non-existent FSP-ID in the FSPIOP-Destination header.json
        - p2p_money_transfer_patch_notifications - payee receives PATCH Notification with ABORTED status after sending invalid FSPIOP-Destination header with transferStatus=RESERVED, file path: golden_path/bug fixes/Test for Bugfix #2697 - Central-Ledger Fulfil Handler does not correctly invalidate requests with an incorrect-non-existent FSP-ID in the FSPIOP-Destination header.json

* chore(release): 17.3.0 [skip ci]

* chore: remove uuid4 (#976)

chore: remove uuid4
 - Remove uuid4 and use native randomUUID.
 - Bump some deps are quested by commit hook

* chore(release): 17.3.1 [skip ci]

* fix(mojaloop/#3615): upgrade dependencies (#985)

* chore(release): 17.3.2 [skip ci]

* fix: remove unneeded async/await

* feat: add validation for participantCurrencyIds / accountIds mapping

* chore: remove resolved TODOs

* test: Add test coverage

* chore: remove whitespaces

* fix: fixrebase  merge conflict

* chore: update dependencies

* doc: fix typo

* chore(snapshot): 17.4.0-snapshot.0

* chore(snapshot): 17.4.0-snapshot.1

* test: update unit tests

* chore(snapshot): 17.4.0-snapshot.2

* fix: route error callback for batch correctly

* chore(snapshot): 17.4.0-snapshot.3

* fix(batch): fix error callback message routing. update tests

* chore(snapshot): 17.4.0-snapshot.4

* fix(batch): fix message routing / properties for more error cases

* chore(snapshot): 17.4.0-snapshot.5

* test: update test coverage

* chore(snapshot): 17.4.0-snapshot.6

* fix: route bulk-prepare messages to non-batch prepare handler

* chore(snapshot): 17.4.0-snapshot.7

* doc: update comment

* chore: update deps. update README for batch.

* doc: Address TODO regarding participanLimits query optimization

* stash

* chore: domain

* chore: test

* test

* chore: update bin processor tests

* chore: coverage

* log

* chore: update

* chore: up retries

* chore: test

* ci

* test

* install

* test

* test

* test

* script

* chore: fix order

* time

* test

* time

* time

* state

* time

* chore: check transfer state

* test

* lint

* chore: await

* address comments

* chore: dep

* chore: address comments

* coverage

* chore(snapshot): 17.5.0-snapshot.0

* remove

---------

Co-authored-by: Vijay <[email protected]>
Co-authored-by: Aarón Reynoza <[email protected]>
Co-authored-by: mojaloopci <[email protected]>
Co-authored-by: Sridevi Miriyala <[email protected]>
Co-authored-by: vijayg10 <[email protected]>
Co-authored-by: Miguel de Barros <[email protected]>
Co-authored-by: Marco Ippolito <[email protected]>
Co-authored-by: Steven Oderayi <[email protected]>
  • Loading branch information
9 people authored Dec 13, 2023
1 parent bd7ce1f commit dddedf6
Show file tree
Hide file tree
Showing 19 changed files with 1,496 additions and 56 deletions.
6 changes: 4 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defaults_machine_Dependencies: &defaults_machine_Dependencies |
sudo add-apt-repository -y "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"

## Install deps
sudo apt install -y librdkafka-dev curl bash
sudo apt install -y librdkafka-dev curl bash musl-dev libsasl2-dev
sudo ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1

defaults_awsCliDependencies: &defaults_awsCliDependencies |
apk --no-cache add aws-cli
Expand Down Expand Up @@ -345,14 +346,15 @@ jobs:
command: |
# Set Node version to default (Note: this is needed on Ubuntu)
nvm use default
npm ci
echo "Running integration tests...."
bash ./test/scripts/test-integration.sh
environment:
ENDPOINT_URL: http://localhost:4545/notification
UV_THREADPOOL_SIZE: 12
WAIT_FOR_REBALANCE: 20
TEST_INT_RETRY_COUNT: 20
TEST_INT_RETRY_COUNT: 30
TEST_INT_RETRY_DELAY: 2
TEST_INT_REBALANCE_DELAY: 20000
- store_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion .circleci/curl-retry-cl-health.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
url="http://localhost:3001/health"

# Number of retries
retries=10
retries=30

# Sleep between retries
sleepwait=1
Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ diverges from the defaults.
You can configure the customized topic names in the config. Each position action key
refers to position messages with associated actions.

NOTE: Only POSITION.PREPARE is supported at this time, with additional event-type-actions being added later when required.
NOTE: Only POSITION.PREPARE and POSITION.COMMIT is supported at this time, with additional event-type-actions being added later when required.

```
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch"
"PREPARE": "topic-transfer-position-batch",
"COMMIT": "topic-transfer-position-batch"
}
}
}
Expand All @@ -123,7 +124,10 @@ Batch processing can be enabled in the transfer execution flow. Follow the steps
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch",
"BULK_PREPARE": "topic-transfer-position"
"BULK_PREPARE": "topic-transfer-position",
"COMMIT": "topic-transfer-position-batch",
"BULK_COMMIT": "topic-transfer-position",
"RESERVE": "topic-transfer-position",
}
}
}
Expand Down Expand Up @@ -243,7 +247,10 @@ env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-
```
- Additionally, run position batch handler in a new terminal
```
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" "CLEDG_HANDLERS__API__DISABLED=true" node src/handlers/index.js handler --positionbatch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT=topic-transfer-position-batch
export CLEDG_HANDLERS__API__DISABLED=true
node src/handlers/index.js handler --positionbatch
```
- Run tests using `npx tape 'test/integration-override/**/handlerBatch.test.js'`

Expand Down
5 changes: 4 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": null,
"BULK_PREPARE": null
"BULK_PREPARE": null,
"COMMIT": null,
"BULK_COMMIT": null,
"RESERVE": null
}
},
"TOPIC_TEMPLATES": {
Expand Down
19 changes: 12 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/central-ledger",
"version": "17.4.1",
"version": "17.5.0-snapshot.0",
"description": "Central ledger hosted by a scheme to record and settle transfers",
"license": "Apache-2.0",
"author": "ModusBox",
Expand Down Expand Up @@ -111,7 +111,7 @@
"hapi-auth-bearer-token": "8.0.0",
"hapi-swagger": "17.2.0",
"ilp-packet": "2.2.0",
"knex": "3.0.1",
"knex": "3.1.0",
"lodash": "4.17.21",
"moment": "2.29.4",
"mongo-uri-builder": "^4.0.0",
Expand Down
2 changes: 1 addition & 1 deletion scripts/_wait4_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const expectedContainers = [
]

let retries = 40
const waitTimeMs = 60000
const waitTimeMs = 30000

async function main () {
const waitingMap = {}
Expand Down
45 changes: 38 additions & 7 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Logger = require('@mojaloop/central-services-logger')
const BatchPositionModel = require('../../models/position/batch')
const BatchPositionModelCached = require('../../models/position/batchCached')
const PositionPrepareDomain = require('./prepare')
const PositionFulfilDomain = require('./fulfil')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
Expand All @@ -52,9 +53,12 @@ const participantFacade = require('../../models/participant/facade')
*/
const processBins = async (bins, trx) => {
const transferIdList = []
iterateThroughBins(bins, (_accountID, _action, item) => {
await iterateThroughBins(bins, (_accountID, _action, item) => {
if (item.decodedPayload?.transferId) {
transferIdList.push(item.decodedPayload.transferId)
// get transferId from uriParams for fulfil messages
} else if (item.message?.value?.content?.uriParams?.id) {
transferIdList.push(item.message.value.content.uriParams.id)
}
})
// Pre fetch latest transferStates for all the transferIds in the account-bin
Expand Down Expand Up @@ -120,13 +124,28 @@ const processBins = async (bins, trx) => {
...settlementCurrencyIds.map(pc => pc.participantCurrencyId)
])

const latestTransferInfoByTransferId = await BatchPositionModel.getTransferInfoList(
trx,
transferIdList,
Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP,
Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
)

let notifyMessages = []
let limitAlarms = []

// For each account-bin in the list
for (const accountID in bins) {
const accountBin = bins[accountID]
const actions = Object.keys(accountBin)
const isSubset = (array1, array2) =>
array2.every((element) => array1.includes(element))
// If non-prepare/non-commit action found, log error
// We need to remove this once we implement all the actions
if (!isSubset(['prepare', 'commit'], actions)) {
Logger.isErrorEnabled && Logger.error('Only prepare/commit actions are allowed in a batch')
// throw new Error('Only prepare action is allowed in a batch')
}

const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value
const settlementModel = currencyIdMap[accountIdMap[accountID].currencyId].settlementModel
Expand All @@ -146,12 +165,24 @@ const processBins = async (bins, trx) => {
let accumulatedTransferStateChanges = []
let accumulatedPositionChanges = []

// If non-prepare action found, log error
// We need to remove this once we implement all the actions
if (actions.length > 1 || (actions.length === 1 && actions[0] !== 'prepare')) {
Logger.isErrorEnabled && Logger.error('Only prepare action is allowed in a batch')
// throw new Error('Only prepare action is allowed in a batch')
}
// If fulfil action found then call processPositionPrepareBin function
const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin(
accountBin.commit,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
latestTransferInfoByTransferId
)

// Update accumulated values
accumulatedPositionValue = fulfilActionResult.accumulatedPositionValue
accumulatedPositionReservedValue = fulfilActionResult.accumulatedPositionReservedValue
accumulatedTransferStates = fulfilActionResult.accumulatedTransferStates
// Append accumulated arrays
accumulatedTransferStateChanges = accumulatedTransferStateChanges.concat(fulfilActionResult.accumulatedTransferStateChanges)
accumulatedPositionChanges = accumulatedPositionChanges.concat(fulfilActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(fulfilActionResult.notifyMessages)

// If prepare action found then call processPositionPrepareBin function
const prepareActionResult = await PositionPrepareDomain.processPositionPrepareBin(
accountBin.prepare,
Expand Down
Loading

0 comments on commit dddedf6

Please sign in to comment.