diff --git a/src/appmixer/kafka/config.js b/src/appmixer/kafka/config.js new file mode 100644 index 000000000..e86ff24c0 --- /dev/null +++ b/src/appmixer/kafka/config.js @@ -0,0 +1,15 @@ +'use strict'; + +module.exports = context => { + + return { + reconnectJob: { + + schedule: context.config.reconnectSchedule || '0 */1 * * * *' + }, + disconnectJob: { + + schedule: context.config.disconnectSchedule || '0 */1 * * * *' + } + }; +}; diff --git a/src/appmixer/kafka/connections.js b/src/appmixer/kafka/connections.js new file mode 100644 index 000000000..fdf7bf70e --- /dev/null +++ b/src/appmixer/kafka/connections.js @@ -0,0 +1,91 @@ +const RegexParser = require('regex-parser'); +const { kafka } = require('./common.js'); + +let openConnections = {}; + +const initializeKafkaConsumer = ({ groupId, authDetails }) => { + + const kafkaMaster = kafka(); + kafkaMaster.init(authDetails); + return kafkaMaster.createConsumer({ groupId }); +}; + +const processMessageHeaders = (headers) => { + + const processedHeaders = {}; + if (headers) { + Object.entries(headers).forEach(([key, value]) => { + processedHeaders[key] = Buffer.isBuffer(value) ? value.toString('utf8') : value || ''; + }); + } + return processedHeaders; +}; + +const processMessageData = (message) => { + + return { + key: message.key.toString(), + value: message.value.toString(), + headers: processMessageHeaders(message.headers) + }; +}; + +const addConnection = async (context, component) => { + + const { topics, flowId, componentId, fromBeginning } = component; + + const topicSubscriptions = topics.AND.map(topic => + topic.topic.startsWith('/') ? RegexParser(topic.topic) : topic.topic + ); + + const connectionId = `${flowId}:${componentId}`; + if (openConnections[connectionId]) return; // Connection already exists, do nothing + + const connection = initializeKafkaConsumer(component); + openConnections[connectionId] = connection; + + + await connection.connect(); + await connection.subscribe({ topics: topicSubscriptions, fromBeginning: fromBeginning || false }); + + await connection.run({ + eachBatchAutoResolve: false, + eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => { + for (const message of batch.messages) { + if (!isRunning() || isStale()) break; + + try { + await context.triggerComponent( + flowId, + componentId, + processMessageData(message), + { enqueueOnly: true } + ); + } catch (err) { + if (err.message === 'Flow stopped.' || err.message === 'Missing flow.') { + await connection.disconnect(); + delete openConnections[connectionId]; + break; + } + } + resolveOffset(message.offset); + await heartbeat(); + } + } + }); +}; + +const removeConnection = async (component) => { + + const connectionId = `${component.flowId}:${component.componentId}`; + const connection = openConnections[connectionId]; + if (!connection) return; // Connection doesn't exist, do nothing + + await connection.disconnect(); + delete openConnections[connectionId]; + +}; + +const listConnections = () => Object.keys(openConnections); + +module.exports = { addConnection, removeConnection, listConnections }; diff --git a/src/appmixer/kafka/jobs.js b/src/appmixer/kafka/jobs.js new file mode 100644 index 000000000..e13862527 --- /dev/null +++ b/src/appmixer/kafka/jobs.js @@ -0,0 +1,32 @@ +'use strict'; +const connections = require('./connections'); + +module.exports = async (context) => { + + const config = require('./config')(context); + + await context.scheduleJob('reconnect', config.reconnectJob.schedule, async () => { + + const registeredComponents = await context.service.loadState(); + + for (const component of registeredComponents) { + await context.log('info', 'Reconnecting the component.'); + await connections.addConnection(context, component.value); + } + }); + + await context.scheduleJob('disconnect', config.disconnectJob.schedule, async () => { + + const registeredComponents = await context.service.loadState(); + const registeredComponentsKeys = new Set(registeredComponents.map(item => item.key)); + + for (const connectionId of connections.listConnections()) { + if (!registeredComponentsKeys.has(connectionId)) { + await connections.removeConnection({ + flowId: connectionId.split(':')[0], + componentId: connectionId.split(':')[1] + }); + } + } + }); +}; diff --git a/src/appmixer/kafka/platform/NewMessage/NewMessage.js b/src/appmixer/kafka/platform/NewMessage/NewMessage.js index 40a713720..817dcc9b9 100644 --- a/src/appmixer/kafka/platform/NewMessage/NewMessage.js +++ b/src/appmixer/kafka/platform/NewMessage/NewMessage.js @@ -1,144 +1,37 @@ 'use strict'; -const RegexParser = require('regex-parser'); -const { kafka } = require('../../common'); - -let consumer; -let interval; -let lock; -let timeout; - module.exports = { - async start(context) { - - await context.stateSet('ignoreNextTick', true); + start(context) { + + const { componentId, flowId, properties: { topics, groupId, fromBeginning } } = context; + return context.callAppmixer({ + endPoint: '/plugins/appmixer/kafka/connection', + method: 'POST', + body: { + authDetails: context.auth, + groupId: groupId || `group-${componentId}:${flowId}`, + topics, + fromBeginning, + componentId, + flowId + } + }); }, - async tick(context) { - - if (await context.stateGet('ignoreNextTick')) { - await context.stateSet('ignoreNextTick', false); - return; - } - - try { - lock = await context.lock('KafkaNewMessage-' + context.componentId, { - ttl: 1000 * 60 * 1, - maxRetryCount: 0 - }); - } catch (err) { - return; - } - return new Promise(async resolve => { - - timeout = setTimeout(async () => { - await disconnectConsumerAndReleaseLock(context); - resolve(); - }, context.config.timeout || 1000 * 60 * 10); - - interval = setInterval(async () => { - // Extend lock if necessary - await lock.extend(parseInt(context.config.lockExtendTime, 10) || 1000 * 60 * 1); - }, context.config.interval || 30000); - - consumer = initializeConsumer(context); - - const flowStatusCheckInterval = setInterval(async () => { - // to get the `stage` of the flow - try { - const resp = await context.callAppmixer({ - endPoint: `/flows/${context.flowId}`, - method: 'GET', - qs: { - projection: 'stage' - } - }); - - if (resp.stage === 'stopped') { - await disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval); - } - } catch (error) { - // check response code for 404 and disconnet the consumer - if (error.response.status === 404) { - await disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval); - } - } - }, context.config.flowStatusCheckInterval || 1000); + stop(context) { - await runConsumer(context); + return context.callAppmixer({ + endPoint: `/plugins/appmixer/kafka/connection/${context.flowId}/${context.componentId}`, + method: 'DELETE' }); - } -}; - -async function disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval) { - - if (consumer) { - await context.log({ message: 'Disconnecting consumer...' }); - clearInterval(flowStatusCheckInterval); - await consumer.disconnect(); - consumer = null; - clearInterval(interval); - clearTimeout(timeout); - lock && await lock.unlock(); - } -} - -function initializeConsumer(context) { - - const { groupId } = context.properties; - const kafkaMaster = kafka(); - kafkaMaster.init(context.auth); - return kafkaMaster.createConsumer({ groupId: groupId || `group-${context.componentId}:${context.flowId}` }); -} - -async function runConsumer(context) { - - const { topics } = context.properties; - - const topicSubscriptions = topics.AND.map(topic => { - - if (topic.topic.startsWith('/')) { - return RegexParser(topic.topic); - } else { - return topic.topic; - } - }); - - await consumer.connect(); - await consumer.subscribe({ topics: topicSubscriptions, fromBeginning: true }); - - await consumer.run({ - eachBatchAutoResolve: false, - eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => { - - for (let message of batch.messages) { - - if (!isRunning() || isStale()) { - break; - } - await processMessage(context, message); - resolveOffset(message.offset); - await heartbeat(); - } - } - }); -} + }, -async function processMessage(context, message) { + async receive(context) { - const headers = {}; - if (message.headers) { - for (const key of Object.keys(message.headers)) { - const header = message.headers[key]; - headers[key] = Buffer.isBuffer(header) ? header.toString('utf8') : (header || ''); + if (context.messages.webhook) { + await context.sendJson(context.messages.webhook.content.data, 'out'); + return context.response(); } } - - const out = { - key: message.key.toString(), - value: message.value.toString(), - headers: headers - }; - await context.sendJson(out, 'out'); -} +}; diff --git a/src/appmixer/kafka/platform/NewMessage/component.json b/src/appmixer/kafka/platform/NewMessage/component.json index a6349121f..dda3c9b06 100644 --- a/src/appmixer/kafka/platform/NewMessage/component.json +++ b/src/appmixer/kafka/platform/NewMessage/component.json @@ -1,5 +1,5 @@ { - "name": "appmixer.kafka.platform.NewMessage", + "name": "appmixer.kafka.platform.NewM", "author": "Harsha Surisetty ", "description": "Watch new messages in a topic.", "private": false, @@ -14,8 +14,14 @@ "schema": { "type": "object", "properties": { + "groupId": { + "type": "string" + }, "topics": { "type": "object" + }, + "fromBeginning": { + "type": "boolean" } }, "required": [ @@ -47,7 +53,13 @@ }, "index": 1, "tooltip": "Each topic can be added by \"Add\" button. If a topic starts with the / character, it is considered as a regular expression." - + }, + "fromBeginning": { + "type": "toggle", + "label": "From Beginning", + "index": 2, + "defaultValue": false, + "tooltip": "When 'From Beginning' is true, the group will use the earliest offset. If set to false, it will use the latest offset. The default is false." } } } diff --git a/src/appmixer/kafka/plugin.js b/src/appmixer/kafka/plugin.js new file mode 100644 index 000000000..1ca27cecd --- /dev/null +++ b/src/appmixer/kafka/plugin.js @@ -0,0 +1,12 @@ +'use strict'; + +module.exports = async context => { + context.log('info', 'Initializing Kafka plugin.'); + + require('./routes')(context); + + context.log('info', 'Scheduling Kafka jobs.'); + await require('./jobs')(context); + + context.log('info', 'Kafka plugin initialized.'); +}; diff --git a/src/appmixer/kafka/routes.js b/src/appmixer/kafka/routes.js new file mode 100644 index 000000000..35e45acaa --- /dev/null +++ b/src/appmixer/kafka/routes.js @@ -0,0 +1,35 @@ +'use strict'; +const connections = require('./connections'); + +module.exports = (context) => { + + context.http.router.register({ + method: 'POST', + path: '/connection', + options: { + handler: async (req, h) => { + + const { flowId, componentId } = req.payload; + + await context.service.stateSet(`${flowId}:${componentId}`, req.payload); + await connections.addConnection(context, req.payload); + return h.response({}); + } + } + }); + + context.http.router.register({ + method: 'DELETE', + path: '/connection/{flowId}/{componentId}', + options: { + handler: async (req, h) => { + + const { flowId, componentId } = req.params; + + await context.service.stateUnset(`${flowId}:${componentId}`); + await connections.removeConnection({ flowId, componentId }); + return h.response({}); + } + } + }); +};