Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/#3524): add position fulfil to binprocessor #990

Merged
merged 122 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 116 commits
Commits
Show all changes
122 commits
Select commit Hold shift + click to select a range
1f02497
feat: added skeleton and comments for prepare bin implementation
vijayg10 Sep 6, 2023
078f212
chore: some changes
vijayg10 Sep 6, 2023
44e4735
feat: added new position handler v2
vijayg10 Sep 7, 2023
83db860
feat: added some functionality to new position handler v2
vijayg10 Sep 7, 2023
4221b8c
feat: added some todos
vijayg10 Sep 7, 2023
6caa4e4
feat: changed new handler name
vijayg10 Sep 8, 2023
f8ac314
fix: lint
vijayg10 Sep 8, 2023
c5377d6
feat: add some changes
vijayg10 Sep 8, 2023
0112ccb
fix: lint
vijayg10 Sep 8, 2023
4424513
chore: added copyright header
vijayg10 Sep 8, 2023
451982e
feat: added some logic
vijayg10 Sep 8, 2023
026c4ef
fix: lint
vijayg10 Sep 8, 2023
41de3fc
feat: added unit tests
vijayg10 Sep 11, 2023
ca1fd41
feat: some improvements
vijayg10 Sep 11, 2023
39e74ee
feat: added bin processor logic
vijayg10 Sep 12, 2023
c300269
fix: lint
vijayg10 Sep 12, 2023
47a821c
feat; refactor
vijayg10 Sep 13, 2023
56e1b53
feat: added limit
vijayg10 Sep 13, 2023
7cf5cc4
fix: unit tests
vijayg10 Sep 13, 2023
4a2cd64
Merge branch 'main' of https://github.com/mojaloop/central-ledger int…
vijayg10 Sep 14, 2023
0ebb926
feat: added integration test
vijayg10 Sep 14, 2023
ea75e60
feat: refactor
vijayg10 Sep 14, 2023
8e775e9
fix: unit tests
vijayg10 Sep 14, 2023
98c48bc
fix: integration tests
vijayg10 Sep 14, 2023
6318ce1
feat: add position prepare bin logic (#970)
kleyow Sep 14, 2023
ba418ea
chore: add tests (#972)
kleyow Sep 15, 2023
e29b8c3
fix: metrics
vijayg10 Sep 15, 2023
6c84aa7
Merge branch feature/position-prepare-binnning of https://github.com/…
vijayg10 Sep 15, 2023
2190012
fix: some issue with bin processor
vijayg10 Sep 18, 2023
26f4b37
feat(mojaloop/#3489): add cached calls for participant currency, upda…
kleyow Sep 19, 2023
e958543
chore: improve coverage and edit metric and query name (#974)
kleyow Sep 21, 2023
d26d222
chore(snapshot): 17.3.0-snapshot.0
vijayg10 Sep 22, 2023
39c5281
fix: test downgrade stream lib
vijayg10 Sep 22, 2023
ac12981
chore(snapshot): 17.3.0-snapshot.1
vijayg10 Sep 22, 2023
8df11e9
fix: notification event action
vijayg10 Sep 25, 2023
8dbcdd2
chore(snapshot): 17.3.0-snapshot.2
vijayg10 Sep 25, 2023
71faf0d
fix: notification event
vijayg10 Sep 25, 2023
531bc0a
chore(snapshot): 17.3.0-snapshot.3
vijayg10 Sep 25, 2023
983dc51
fix: notification messages
vijayg10 Sep 25, 2023
72062f6
chore(snapshot): 17.3.0-snapshot.4
vijayg10 Sep 25, 2023
e9227c3
chore: updated conf
vijayg10 Sep 26, 2023
d393f71
chore: update deps
vijayg10 Sep 26, 2023
e24886e
chore(snapshot): 17.3.0-snapshot.5
vijayg10 Sep 26, 2023
b8b4cb2
fix(mojaloop/#3529): fix high latency (#981)
aaronreynoza Oct 2, 2023
c9032a4
chore(release): 17.2.1 [skip ci]
Oct 2, 2023
695f5e4
feat(mojaloop/#3489): add negative integration tests (#975)
sri-miriyala Sep 27, 2023
48d1ddb
Merge branch main of https://github.com/mojaloop/central-ledger into …
vijayg10 Oct 10, 2023
bc09d45
chore: foreach to for loop
vijayg10 Oct 10, 2023
c3a70c4
chore: cleanup
vijayg10 Oct 10, 2023
fdeb31e
Update src/handlers/index.js
vijayg10 Oct 10, 2023
ceda96a
chore: added readme doc
vijayg10 Oct 10, 2023
557df15
fix: lint
vijayg10 Oct 10, 2023
d8a27a2
fix: unit tests
vijayg10 Oct 10, 2023
72c333e
fix: audit
vijayg10 Oct 10, 2023
e9b191f
chore: cleanup
vijayg10 Oct 10, 2023
ab06b45
fix(mojaloop/#3533): helm v15.2.0-rc fixes (#982)
mdebarros Oct 27, 2023
627a795
chore(release): 17.3.0 [skip ci]
Oct 27, 2023
31ad569
chore: remove uuid4 (#976)
marco-ippolito Nov 1, 2023
898cf29
chore(release): 17.3.1 [skip ci]
Nov 1, 2023
92079d1
fix(mojaloop/#3615): upgrade dependencies (#985)
oderayi Nov 6, 2023
bdc531b
chore(release): 17.3.2 [skip ci]
Nov 6, 2023
3b34e67
fix: remove unneeded async/await
oderayi Nov 9, 2023
12acf12
feat: add validation for participantCurrencyIds / accountIds mapping
oderayi Nov 9, 2023
982bf6e
chore: remove resolved TODOs
oderayi Nov 9, 2023
7871fa8
test: Add test coverage
oderayi Nov 9, 2023
ae88d68
chore: remove whitespaces
oderayi Nov 9, 2023
008b751
fix: fixrebase merge conflict
oderayi Nov 9, 2023
bd499f2
chore: update dependencies
oderayi Nov 9, 2023
3689a6c
chore: resolve conflicts
oderayi Nov 9, 2023
518439f
doc: fix typo
oderayi Nov 10, 2023
2b2c429
chore(snapshot): 17.4.0-snapshot.0
oderayi Nov 10, 2023
b339177
chore(snapshot): 17.4.0-snapshot.1
oderayi Nov 13, 2023
27f7491
test: update unit tests
oderayi Nov 13, 2023
c6aae40
chore(snapshot): 17.4.0-snapshot.2
oderayi Nov 13, 2023
378dc42
fix: route error callback for batch correctly
oderayi Nov 14, 2023
eb02c2b
chore(snapshot): 17.4.0-snapshot.3
oderayi Nov 14, 2023
7b21d9b
fix(batch): fix error callback message routing. update tests
oderayi Nov 14, 2023
5c00e8e
chore(snapshot): 17.4.0-snapshot.4
oderayi Nov 14, 2023
cc92e28
fix(batch): fix message routing / properties for more error cases
oderayi Nov 15, 2023
1533b5a
chore(snapshot): 17.4.0-snapshot.5
oderayi Nov 15, 2023
da295b6
test: update test coverage
oderayi Nov 15, 2023
6079f61
chore(snapshot): 17.4.0-snapshot.6
oderayi Nov 15, 2023
3cd3ced
fix: route bulk-prepare messages to non-batch prepare handler
oderayi Nov 16, 2023
06049e8
chore(snapshot): 17.4.0-snapshot.7
oderayi Nov 16, 2023
e325497
doc: update comment
oderayi Nov 16, 2023
d04cfef
chore: update deps. update README for batch.
oderayi Nov 20, 2023
d827219
doc: Address TODO regarding participanLimits query optimization
oderayi Nov 21, 2023
640e3d9
stash
kleyow Dec 1, 2023
2a5f50b
chore: domain
kleyow Dec 1, 2023
7a4a726
chore: test
kleyow Dec 1, 2023
57c00a7
test
kleyow Dec 1, 2023
3280eed
chore: update bin processor tests
kleyow Dec 4, 2023
55a9a85
chore: coverage
kleyow Dec 4, 2023
7b968a0
log
kleyow Dec 4, 2023
a80b335
Merge remote-tracking branch 'origin/main' into feat/position-fulfil-…
kleyow Dec 5, 2023
15e2613
chore: update
kleyow Dec 6, 2023
6ca898b
chore: up retries
kleyow Dec 6, 2023
1e4ee76
chore: test
kleyow Dec 6, 2023
c2bf642
ci
kleyow Dec 6, 2023
3d18120
test
kleyow Dec 6, 2023
2512cd0
install
kleyow Dec 6, 2023
a5b2b42
test
kleyow Dec 6, 2023
3fa5654
test
kleyow Dec 6, 2023
ad60a52
test
kleyow Dec 7, 2023
fa7ddeb
script
kleyow Dec 7, 2023
dfde6db
chore: fix order
kleyow Dec 7, 2023
c2cd5e1
time
kleyow Dec 7, 2023
7774be1
test
kleyow Dec 7, 2023
9b3c586
time
kleyow Dec 7, 2023
aba070e
time
kleyow Dec 7, 2023
3980b2d
state
kleyow Dec 7, 2023
d155773
time
kleyow Dec 7, 2023
a16f9b5
chore: check transfer state
kleyow Dec 7, 2023
651efd2
test
kleyow Dec 7, 2023
be8b98c
lint
kleyow Dec 7, 2023
b38e7f6
chore: await
kleyow Dec 7, 2023
7c5f250
address comments
kleyow Dec 8, 2023
f3fb493
chore: dep
kleyow Dec 8, 2023
a7daff5
chore: address comments
kleyow Dec 8, 2023
5158115
coverage
kleyow Dec 8, 2023
1acebbb
chore(snapshot): 17.5.0-snapshot.0
kleyow Dec 11, 2023
f660f3f
remove
kleyow Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defaults_machine_Dependencies: &defaults_machine_Dependencies |
sudo add-apt-repository -y "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"

## Install deps
sudo apt install -y librdkafka-dev curl bash
sudo apt install -y librdkafka-dev curl bash musl-dev libsasl2-dev
sudo ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1

defaults_awsCliDependencies: &defaults_awsCliDependencies |
apk --no-cache add aws-cli
Expand Down Expand Up @@ -345,14 +346,15 @@ jobs:
command: |
# Set Node version to default (Note: this is needed on Ubuntu)
nvm use default
npm ci

echo "Running integration tests...."
bash ./test/scripts/test-integration.sh
environment:
ENDPOINT_URL: http://localhost:4545/notification
UV_THREADPOOL_SIZE: 12
WAIT_FOR_REBALANCE: 20
TEST_INT_RETRY_COUNT: 20
TEST_INT_RETRY_COUNT: 30
TEST_INT_RETRY_DELAY: 2
TEST_INT_REBALANCE_DELAY: 20000
- store_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion .circleci/curl-retry-cl-health.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
url="http://localhost:3001/health"

# Number of retries
retries=10
retries=30

# Sleep between retries
sleepwait=1
Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ diverges from the defaults.
You can configure the customized topic names in the config. Each position action key
refers to position messages with associated actions.

NOTE: Only POSITION.PREPARE is supported at this time, with additional event-type-actions being added later when required.
NOTE: Only POSITION.PREPARE and POSITION.COMMIT is supported at this time, with additional event-type-actions being added later when required.

```
"KAFKA": {
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch"
"PREPARE": "topic-transfer-position-batch",
"COMMIT": "topic-transfer-position-batch"
}
}
}
Expand All @@ -123,7 +124,10 @@ Batch processing can be enabled in the transfer execution flow. Follow the steps
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": "topic-transfer-position-batch",
"BULK_PREPARE": "topic-transfer-position"
"BULK_PREPARE": "topic-transfer-position",
"COMMIT": "topic-transfer-position-batch",
"BULK_COMMIT": "topic-transfer-position",
"RESERVE": "topic-transfer-position",
}
}
}
Expand Down Expand Up @@ -243,7 +247,10 @@ env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-
```
- Additionally, run position batch handler in a new terminal
```
env "CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch" "CLEDG_HANDLERS__API__DISABLED=true" node src/handlers/index.js handler --positionbatch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__PREPARE=topic-transfer-position-batch
export CLEDG_KAFKA__EVENT_TYPE_ACTION_TOPIC_MAP__POSITION__COMMIT=topic-transfer-position-batch
export CLEDG_HANDLERS__API__DISABLED=true
node src/handlers/index.js handler --positionbatch
```
- Run tests using `npx tape 'test/integration-override/**/handlerBatch.test.js'`

Expand Down
5 changes: 4 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@
"EVENT_TYPE_ACTION_TOPIC_MAP" : {
"POSITION":{
"PREPARE": null,
"BULK_PREPARE": null
"BULK_PREPARE": null,
"COMMIT": null,
"BULK_COMMIT": null,
"RESERVE": null
}
},
"TOPIC_TEMPLATES": {
Expand Down
2 changes: 1 addition & 1 deletion scripts/_wait4_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const expectedContainers = [
]

let retries = 40
const waitTimeMs = 60000
const waitTimeMs = 30000

async function main () {
const waitingMap = {}
Expand Down
39 changes: 31 additions & 8 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Logger = require('@mojaloop/central-services-logger')
const BatchPositionModel = require('../../models/position/batch')
const BatchPositionModelCached = require('../../models/position/batchCached')
const PositionPrepareDomain = require('./prepare')
const PositionFulfilDomain = require('./fulfil')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
Expand All @@ -52,9 +53,12 @@ const participantFacade = require('../../models/participant/facade')
*/
const processBins = async (bins, trx) => {
const transferIdList = []
iterateThroughBins(bins, (_accountID, _action, item) => {
await iterateThroughBins(bins, (_accountID, _action, item) => {
if (item.decodedPayload?.transferId) {
transferIdList.push(item.decodedPayload.transferId)
// get transferId from uriParams for fulfil messages
} else if (item.message?.value?.content?.uriParams?.id) {
transferIdList.push(item.message.value.content.uriParams.id)
}
})
// Pre fetch latest transferStates for all the transferIds in the account-bin
Expand Down Expand Up @@ -120,13 +124,20 @@ const processBins = async (bins, trx) => {
...settlementCurrencyIds.map(pc => pc.participantCurrencyId)
])

const latestTransferInfoByTransferId = await BatchPositionModel.getTransferInfosToChangePosition(
trx,
transferIdList,
Enum.Accounts.TransferParticipantRoleType.PAYEE_DFSP,
Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE
)

vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
let notifyMessages = []
let limitAlarms = []

// For each account-bin in the list
for (const accountID in bins) {
const accountBin = bins[accountID]
const actions = Object.keys(accountBin)
// const actions = Object.keys(accountBin)

const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value
const settlementModel = currencyIdMap[accountIdMap[accountID].currencyId].settlementModel
Expand All @@ -146,12 +157,24 @@ const processBins = async (bins, trx) => {
let accumulatedTransferStateChanges = []
let accumulatedPositionChanges = []

// If non-prepare action found, log error
// We need to remove this once we implement all the actions
if (actions.length > 1 || (actions.length === 1 && actions[0] !== 'prepare')) {
Logger.isErrorEnabled && Logger.error('Only prepare action is allowed in a batch')
// throw new Error('Only prepare action is allowed in a batch')
}
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
// If fulfil action found then call processPositionPrepareBin function
const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin(
accountBin.commit,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
latestTransferInfoByTransferId
)
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved

// Update accumulated values
accumulatedPositionValue = fulfilActionResult.accumulatedPositionValue
accumulatedPositionReservedValue = fulfilActionResult.accumulatedPositionReservedValue
accumulatedTransferStates = fulfilActionResult.accumulatedTransferStates
// Append accumulated arrays
accumulatedTransferStateChanges = accumulatedTransferStateChanges.concat(fulfilActionResult.accumulatedTransferStateChanges)
accumulatedPositionChanges = accumulatedPositionChanges.concat(fulfilActionResult.accumulatedPositionChanges)
notifyMessages = notifyMessages.concat(fulfilActionResult.notifyMessages)

vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
// If prepare action found then call processPositionPrepareBin function
const prepareActionResult = await PositionPrepareDomain.processPositionPrepareBin(
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
accountBin.prepare,
Expand Down
153 changes: 153 additions & 0 deletions src/domain/position/fulfil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const MLNumber = require('@mojaloop/ml-number')
const Logger = require('@mojaloop/central-services-logger')

/**
* @function processPositionFulfilBin
*
* @async
* @description This is the domain function to process a bin of position-fulfil messages of a single participant account.
*
* @param {array} binItems - an array of objects that contain a position fulfil message and its span. {message, span}
* @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing
* @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency
* @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {object} accumulatedTransferInfo - object with transfer id keys and transfer info values. Used to pass transfer info to domain function.
* @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionFulfilBin = async (
binItems,
accumulatedPositionValue,
accumulatedPositionReservedValue,
accumulatedTransferStates,
accumulatedTransferInfo
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
) => {
const transferStateChanges = []
const participantPositionChanges = []
const resultMessages = []
const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates)
let runningPosition = new MLNumber(accumulatedPositionValue)

if (binItems && binItems.length > 0) {
for (const binItem of binItems) {
let transferStateId
let reason
let resultMessage
const transferId = binItem.message.value.content.uriParams.id
const payeeFsp = binItem.message.value.from
const payerFsp = binItem.message.value.to
const transfer = binItem.decodedPayload
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transfer:processingMessage: ${JSON.stringify(transfer)}`)
Logger.isDebugEnabled && Logger.debug(`accumulatedTransferStates: ${JSON.stringify(accumulatedTransferStates)}`)
// Inform payee dfsp if transfer is not in RECEIVED_FULFIL state, skip making any transfer state changes
if (accumulatedTransferStates[transferId] !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
// forward same headers from the prepare message, except the content-length header
// set destination to payeefsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = payeeFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
`Invalid State: ${accumulatedTransferStates[transferId]} - expected: ${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL}`
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.FULFIL,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
metadata,
headers,
fspiopError,
{ id: transferId },
'application/json'
)
} else {
const transferInfo = accumulatedTransferInfo[transferId]

// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
transferId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.COMMIT,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
transferId,
payerFsp,
payeeFsp,
metadata,
headers,
transfer,
{ id: transferId },
'application/json'
)

transferStateId = Enum.Transfers.TransferState.COMMITTED
// Amounts in `transferParticipant` for the payee are stored as negative values
runningPosition = new MLNumber(runningPosition.add(transferInfo.amount).toFixed(Config.AMOUNT.SCALE))

const participantPositionChange = {
transferId, // Need to delete this in bin processor while updating transferStateChangeId
transferStateChangeId: null, // Need to update this in bin processor while executing queries
value: runningPosition.toNumber(),
reservedValue: accumulatedPositionReservedValue
}
participantPositionChanges.push(participantPositionChange)
binItem.result = { success: true }
}

resultMessages.push({ binItem, message: resultMessage })

if (transferStateId) {
const transferStateChange = {
transferId,
transferStateId,
reason
}
transferStateChanges.push(transferStateChange)
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::transferStateChange: ${JSON.stringify(transferStateChange)}`)

accumulatedTransferStatesCopy[transferId] = transferStateId
Logger.isDebugEnabled && Logger.debug(`processPositionFulfilBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`)
}
}
}

return {
accumulatedPositionValue: runningPosition.toNumber(),
accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing
accumulatedPositionReservedValue, // not used but kept for consistency
accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order
accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order
notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message}
}
}

module.exports = {
processPositionFulfilBin
}
19 changes: 18 additions & 1 deletion src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,25 @@ const fulfil = async (error, messages) => {
await TransferService.handlePayeeResponse(transferId, payload, action)
const eventDetail = { functionality: TransferEventType.POSITION, action }
// Key position fulfil message with payee account id
let topicNameOverride
if (action === TransferEventAction.COMMIT) {
topicNameOverride = Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.COMMIT
} else if (action === TransferEventAction.RESERVE) {
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
topicNameOverride = Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.RESERVE
} else if (action === TransferEventAction.BULK_COMMIT) {
topicNameOverride = Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.BULK_COMMIT
}
const payeeAccount = await Participant.getAccountByNameAndCurrency(transfer.payeeFsp, transfer.currency, Enum.Accounts.LedgerAccountType.POSITION)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, messageKey: payeeAccount.participantCurrencyId.toString() })
await Kafka.proceed(
Config.KAFKA_CONFIG,
params,
{
consumerCommit,
eventDetail,
messageKey: payeeAccount.participantCurrencyId.toString(),
topicNameOverride
}
)
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
}
Expand Down
34 changes: 33 additions & 1 deletion src/models/position/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ const updateParticipantPosition = async (trx, participantPositionId, participant
})
}

const getTransferInfosToChangePosition = async (trx, transferIds, transferParticipantRoleTypeId, ledgerEntryTypeId) => {
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
try {
const knex = await Db.getKnex()
const transferInfos = await knex('transferParticipant')
.transacting(trx)
.innerJoin('transferStateChange AS tsc', 'tsc.transferId', 'transferParticipant.transferId')
vijayg10 marked this conversation as resolved.
Show resolved Hide resolved
.where({
'transferParticipant.transferParticipantRoleTypeId': transferParticipantRoleTypeId,
'transferParticipant.ledgerEntryTypeId': ledgerEntryTypeId
})
.whereIn('transferParticipant.transferId', transferIds)
.select(
'transferParticipant.*',
'tsc.transferStateId',
'tsc.reason'
)
.orderBy('tsc.transferStateChangeId', 'desc')
const info = {}
// This should key the transfer info with the latest transferStateChangeId
for (const transferInfo of transferInfos) {
if (!(transferInfo.transferId in info)) {
info[transferInfo.transferId] = transferInfo
}
}
return info
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw err
}
}

const bulkInsertTransferStateChanges = async (trx, transferStateChangeList) => {
const knex = await Db.getKnex()
return await knex.batchInsert('transferStateChange', transferStateChangeList).transacting(trx)
Expand All @@ -121,5 +152,6 @@ module.exports = {
updateParticipantPosition,
bulkInsertTransferStateChanges,
bulkInsertParticipantPositionChanges,
getAllParticipantCurrency
getAllParticipantCurrency,
getTransferInfosToChangePosition
}
Loading
Loading