Skip to content

Commit

Permalink
feat(rules)!:separate rules handler from cgs handler (#346)
Browse files Browse the repository at this point in the history
* WIP: 2056

* WIP: 2056

* unit tests: 2056

* unit tests: 2056
  • Loading branch information
shashi165 authored Mar 3, 2021
1 parent 5bf0b97 commit 54b8bb9
Show file tree
Hide file tree
Showing 25 changed files with 25,000 additions and 6,217 deletions.
4 changes: 2 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
"consumeTimeout": 1000
},
"rdkafkaConf": {
"client.id": "cs-con-transferfulfil-process",
"group.id": "cs-group-transferfulfil-process",
"client.id": "cs-con-notification-event",
"group.id": "cs-group-notification-event",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true
},
Expand Down
30,006 changes: 23,878 additions & 6,128 deletions package-lock.json

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "central-settlement",
"description": "Central settlements hosted by a scheme to record and make settlements.",
"version": "12.0.3",
"version": "13.0.0",
"license": "Apache-2.0",
"private": false,
"author": "ModusBox",
Expand All @@ -27,15 +27,15 @@
"@hapi/boom": "9.1.0",
"@hapi/catbox-memory": "5.0.0",
"@hapi/good": "9.0.1",
"@hapi/hapi": "19.2.0",
"@hapi/hapi": "20.1.0",
"@hapi/inert": "6.0.1",
"@hapi/vision": "6.0.0",
"@mojaloop/central-ledger": "13.0.0",
"@mojaloop/central-services-database": "10.6.1",
"@mojaloop/central-services-error-handling": "11.1.0",
"@mojaloop/central-services-health": "11.0.0",
"@mojaloop/central-services-logger": "10.6.0",
"@mojaloop/central-services-shared": "11.5.5",
"@mojaloop/central-services-shared": "11.5.6",
"@mojaloop/central-services-stream": "10.6.0",
"@mojaloop/ml-number": "11.0.0",
"@now-ims/hapi-now-auth": "2.0.3",
Expand All @@ -46,7 +46,7 @@
"hapi-auth-bearer-token": "8.0.0",
"hapi-openapi": "2.0.2",
"hapi-swagger": "14.1.0",
"lodash": "4.17.20",
"lodash": "4.17.21",
"mustache": "4.1.0",
"parse-strings-in-object": "2.0.0",
"rc": "1.2.8",
Expand All @@ -61,25 +61,25 @@
"chai": "4.3.0",
"chai-exclude": "2.0.2",
"chai-subset": "1.6.0",
"jest": "26.6.3",
"jest-junit": "12.0.0",
"eslint": "7.20.0",
"faucet": "0.0.1",
"get-port": "5.1.1",
"jest": "26.6.3",
"jest-junit": "12.0.0",
"node-fetch": "2.6.1",
"nodemon": "2.0.7",
"npm-audit-resolver": "2.2.1",
"npm-check-updates": "11.1.1",
"npm-check-updates": "11.1.9",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"rewire": "5.0.0",
"supertest": "6.1.3",
"sinon": "9.2.4",
"standard": "16.0.3",
"supertest": "6.1.3",
"swagmock": "1.0.0",
"tap-xunit": "2.4.1",
"tape": "5.1.1",
"tape": "5.2.0",
"tapes": "4.1.0"
},
"pre-commit": [
Expand Down Expand Up @@ -113,5 +113,10 @@
"generator-swaggerize": {
"version": "4.1.0"
},
"main": "./server"
"main": "./server",
"standard": {
"ignore": [
"test/unit/data/invalidFile/dummyFeeCalculationTestScriptInvalidFile.js"
]
}
}
42 changes: 42 additions & 0 deletions src/domain/rules/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <[email protected]>
* ModusBox
- Shashikant Hirugade <[email protected]>
--------------
******/

const ErrorHandler = require('@mojaloop/central-services-error-handling')
const TransferSettlementModel = require('../../models/transferSettlement')
const Logger = require('@mojaloop/central-services-logger')

module.exports = {
insertLedgerEntries: async function insertLedgerEntries (ledgerEntries, transferEventId, trx) {
Logger.debug(`rules::insertLedgerEntries - ledgerEntries=${JSON.stringify(ledgerEntries)}`)

try {
await TransferSettlementModel.insertLedgerEntries(ledgerEntries, transferEventId, trx)
return true
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

}
9 changes: 0 additions & 9 deletions src/domain/transferSettlement/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,5 @@ module.exports = {
Logger.debug('transferSettlement::processMsgFulfil - error!', err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
},
insertLedgerEntries: async function insertLedgerEntries (ledgerEntries, transferEventId, trx) {
try {
await TransferSettlementModel.insertLedgerEntries(ledgerEntries, transferEventId, trx)
return true
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

}
10 changes: 10 additions & 0 deletions src/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Program.command('handler') // sub-command name, coffeeType = type, required
.description('Start a specified Handler') // command description
.option('--settlementwindow', 'Start the Settlement Window Handler')
.option('--transfersettlement', 'Start the Transfer Settlement Handler')
.option('--rules', 'Start the Rules Handler')
// function to execute when command is used
.action(async (args) => {
const handlerList = []
Expand All @@ -73,6 +74,15 @@ Program.command('handler') // sub-command name, coffeeType = type, required
handlerList.push(handler)
}

if (args.rules === true) {
Logger.debug('CLI: Executing --rules')
const handler = {
type: 'rules',
enabled: true
}
handlerList.push(handler)
}

module.exports = Setup.initialize({
service: 'handler',
port: Config.PORT,
Expand Down
8 changes: 7 additions & 1 deletion src/handlers/register.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Logger = require('@mojaloop/central-services-logger')
const requireGlob = require('require-glob')
const SettlementWindowHandlers = require('./settlementWindow/handler')
const TransferSettlementHandler = require('./transferSettlement/handler')
const RulesHandler = require('./rules/handler')

/**
* @module src/handlers
Expand Down Expand Up @@ -80,6 +81,11 @@ module.exports = {
},
transfersettlement: {
registerAllHandlers: TransferSettlementHandler.registerAllHandlers,
registerTransferSettlementHandler: TransferSettlementHandler.registerTransferSettlement
registerTransferSettlementHandler: TransferSettlementHandler.registerTransferSettlement,
registerRulesHandler: RulesHandler.registerRules
},
rules: {
registerAllHandlers: RulesHandler.registerAllHandlers,
registerRulesHandler: RulesHandler.registerRules
}
}
163 changes: 163 additions & 0 deletions src/handlers/rules/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*****
License
--------------
Copyright © 2017 Bill & Melinda Gates Foundation
The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Contributors
--------------
This is the official list of the Mojaloop project contributors for this file.
Names of the original copyright holders (individuals or organizations)
should be listed with a '*' in the first column. People who have
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets <email>.
* Gates Foundation
- Name Surname <[email protected]>
* ModusBox
- Shashikant Hirugade <[email protected]>
--------------
******/
'use strict'

/**
* @module src/handlers/transfers
*/
const Config = require('../../lib/config')
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandling = require('@mojaloop/central-services-error-handling')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Logger = require('@mojaloop/central-services-logger')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const RulesService = require('../../domain/rules')
const scriptsLoader = require('../../lib/scriptsLoader')
const Utility = require('@mojaloop/central-services-shared').Util
const Db = require('../../lib/db')
const LOG_LOCATION = { module: 'RulesHandler', method: '', path: '' } // var object used as pointer
const CONSUMER_COMMIT = true
const FROM_SWITCH = true

const SCRIPTS_FOLDER = Config.HANDLERS.SETTINGS.SCRIPTS_FOLDER
let INJECTED_SCRIPTS = {}

async function processRules (error, messages) {
if (error) {
Logger.error(error)
throw ErrorHandling.Factory.reformatFSPIOPError(error)
}
Logger.info(Utility.breadcrumb(LOG_LOCATION, messages))
let message = {}
try {
Logger.info(Utility.breadcrumb(LOG_LOCATION, { method: 'processRules' }))
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = message.value.content.payload
const kafkaTopic = message.topic
const params = { message, kafkaTopic, decodedPayload: payload, consumer: Consumer, producer: Producer }

const transferEventId = message.value.id
const transferEventType = message.value.metadata.event.type
const transferEventAction = message.value.metadata.event.action
const transferEventStateStatus = message.value.metadata.event.state.status
const actionLetter = transferEventAction === Enum.Events.Event.Action.COMMIT
? Enum.Events.ActionLetter.commit
: Enum.Events.ActionLetter.unknown

if (!payload) {
Logger.info(Utility.breadcrumb(LOG_LOCATION, `missingPayload--${actionLetter}1`))
const fspiopError = ErrorHandling.Factory.createInternalServerFSPIOPError('Rules handler missing payload')
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.SETTLEMENT_WINDOW }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { CONSUMER_COMMIT, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, FROM_SWITCH })
throw fspiopError
}
Logger.info(Utility.breadcrumb(LOG_LOCATION, 'validationPassed'))

// execute the rule
Logger.info(Utility.breadcrumb(LOG_LOCATION, 'executing the scripts'))
const scriptResults = await scriptsLoader.executeScripts(INJECTED_SCRIPTS, transferEventType, transferEventAction, transferEventStateStatus, message.value)
Logger.debug(`Rules Handler - scriptResults: ${JSON.stringify(scriptResults)}`)

const ledgerEntries = scriptResults.ledgerEntries ? scriptResults.ledgerEntries : []
if (ledgerEntries.length > 0) {
const knex = Db.getKnex()
await knex.transaction(async trx => {
try {
await RulesService.insertLedgerEntries(ledgerEntries, transferEventId, trx)
await trx.commit
} catch (err) {
await trx.rollback
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
})
}
Logger.info(Utility.breadcrumb(LOG_LOCATION, `done--${actionLetter}2`))
return true
} catch (err) {
Logger.error(`${Utility.breadcrumb(LOG_LOCATION)}::${err.message}--0`, err)
return true
}
}

/**
* @function registerRules
*
* @async
* @description Registers RulesHandler for processing settlement rules. Gets Kafka config from default.json
* Calls createHandler to register the handler against the Stream Processing API
* @returns {boolean} - Returns a boolean: true if successful, or throws and error if failed
*/
async function registerRules () {
try {
if (SCRIPTS_FOLDER == null) {
throw new Error('No SCRIPTS_FOLDER configured for running the rules handler')
}
INJECTED_SCRIPTS = scriptsLoader.loadScripts(SCRIPTS_FOLDER)
const registerRulesHandler = {
command: processRules,
topicName: Kafka.transformGeneralTopicName(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, Enum.Events.Event.Type.NOTIFICATION, Enum.Events.Event.Action.EVENT),
config: Kafka.getKafkaConfig(Config.KAFKA_CONFIG, Enum.Kafka.Config.CONSUMER, Enum.Events.Event.Type.NOTIFICATION.toUpperCase(), Enum.Events.Event.Action.EVENT.toUpperCase())
}
registerRulesHandler.config.rdkafkaConf['client.id'] = registerRulesHandler.topicName
await Consumer.createHandler(registerRulesHandler.topicName, registerRulesHandler.config, registerRulesHandler.command)
return true
} catch (err) {
Logger.error(err)
throw ErrorHandling.Factory.reformatFSPIOPError(err)
}
}

/**
* @function RegisterAllHandlers
*
* @async
* @description Registers all handlers
*
* @returns {boolean} - Returns a boolean: true if successful, or throws and error if failed
*/
async function registerAllHandlers () {
try {
await registerRules()
return true
} catch (err) {
throw ErrorHandling.Factory.reformatFSPIOPError(err)
}
}

module.exports = {
processRules,
registerAllHandlers,
registerRules
}
10 changes: 0 additions & 10 deletions src/handlers/transferSettlement/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const Logger = require('@mojaloop/central-services-logger')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const retry = require('async-retry')
const transferSettlementService = require('../../domain/transferSettlement')
const scriptsLoader = require('../../lib/scriptsLoader')
const Utility = require('@mojaloop/central-services-shared').Util
const Db = require('../../lib/db')
const LOG_LOCATION = { module: 'TransferFulfilHandler', method: '', path: '' } // var object used as pointer
Expand All @@ -54,9 +53,6 @@ const RETRY_OPTIONS = {
maxTimeout: Config.WINDOW_AGGREGATION_RETRY_INTERVAL
}

const SCRIPTS_FOLDER = Config.HANDLERS.SETTINGS.SCRIPTS_FOLDER
let INJECTED_SCRIPTS = {}

async function processTransferSettlement (error, messages) {
if (error) {
Logger.error(error)
Expand Down Expand Up @@ -92,15 +88,10 @@ async function processTransferSettlement (error, messages) {
Logger.info(Utility.breadcrumb(LOG_LOCATION, 'validationPassed'))

if (transferEventAction === Enum.Events.Event.Action.COMMIT) {
const scriptResults = await scriptsLoader.executeScripts(INJECTED_SCRIPTS, 'notification', transferEventAction, transferEventStateStatus, message.value)
const ledgerEntries = scriptResults.ledgerEntries ? scriptResults.ledgerEntries : []
await retry(async () => { // use bail(new Error('to break before max retries'))
const knex = Db.getKnex()
await knex.transaction(async trx => {
try {
if (ledgerEntries.length > 0) {
await transferSettlementService.insertLedgerEntries(ledgerEntries, transferEventId, trx)
}
await transferSettlementService.processMsgFulfil(transferEventId, transferEventStateStatus, trx)
await trx.commit
} catch (err) {
Expand Down Expand Up @@ -129,7 +120,6 @@ async function processTransferSettlement (error, messages) {
*/
async function registerTransferSettlement () {
try {
INJECTED_SCRIPTS = scriptsLoader.loadScripts(SCRIPTS_FOLDER)
const transferFulfillHandler = {
command: processTransferSettlement,
topicName: Kafka.transformGeneralTopicName(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, Enum.Events.Event.Type.NOTIFICATION, Enum.Events.Event.Action.EVENT),
Expand Down
Loading

0 comments on commit 54b8bb9

Please sign in to comment.