Skip to content

Commit

Permalink
refactor kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
harshaio committed Apr 10, 2024
1 parent dfb906c commit e8f7429
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 134 deletions.
15 changes: 15 additions & 0 deletions src/appmixer/kafka/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

module.exports = context => {

return {
reconnectJob: {

schedule: context.config.reconnectSchedule || '0 */1 * * * *'
},
disconnectJob: {

schedule: context.config.disconnectSchedule || '0 */1 * * * *'
}
};
};
91 changes: 91 additions & 0 deletions src/appmixer/kafka/connections.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const RegexParser = require('regex-parser');
const { kafka } = require('./common.js');

let openConnections = {};

const initializeKafkaConsumer = ({ groupId, authDetails }) => {

const kafkaMaster = kafka();
kafkaMaster.init(authDetails);
return kafkaMaster.createConsumer({ groupId });
};

const processMessageHeaders = (headers) => {

const processedHeaders = {};
if (headers) {
Object.entries(headers).forEach(([key, value]) => {
processedHeaders[key] = Buffer.isBuffer(value) ? value.toString('utf8') : value || '';
});
}
return processedHeaders;
};

const processMessageData = (message) => {

return {
key: message.key.toString(),
value: message.value.toString(),
headers: processMessageHeaders(message.headers)
};
};

const addConnection = async (context, component) => {

const { topics, flowId, componentId, fromBeginning } = component;

const topicSubscriptions = topics.AND.map(topic =>
topic.topic.startsWith('/') ? RegexParser(topic.topic) : topic.topic
);

const connectionId = `${flowId}:${componentId}`;
if (openConnections[connectionId]) return; // Connection already exists, do nothing

const connection = initializeKafkaConsumer(component);
openConnections[connectionId] = connection;


await connection.connect();
await connection.subscribe({ topics: topicSubscriptions, fromBeginning: fromBeginning || false });

await connection.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;

try {
await context.triggerComponent(
flowId,
componentId,
processMessageData(message),
{ enqueueOnly: true }
);
} catch (err) {
if (err.message === 'Flow stopped.' || err.message === 'Missing flow.') {
await connection.disconnect();
delete openConnections[connectionId];
break;
}
}
resolveOffset(message.offset);
await heartbeat();
}
}
});
};

const removeConnection = async (component) => {

const connectionId = `${component.flowId}:${component.componentId}`;
const connection = openConnections[connectionId];
if (!connection) return; // Connection doesn't exist, do nothing

await connection.disconnect();
delete openConnections[connectionId];

};

const listConnections = () => Object.keys(openConnections);

module.exports = { addConnection, removeConnection, listConnections };
32 changes: 32 additions & 0 deletions src/appmixer/kafka/jobs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict';
const connections = require('./connections');

module.exports = async (context) => {

const config = require('./config')(context);

await context.scheduleJob('reconnect', config.reconnectJob.schedule, async () => {

const registeredComponents = await context.service.loadState();

for (const component of registeredComponents) {
await context.log('info', 'Reconnecting the component.');
await connections.addConnection(context, component.value);
}
});

await context.scheduleJob('disconnect', config.disconnectJob.schedule, async () => {

const registeredComponents = await context.service.loadState();
const registeredComponentsKeys = new Set(registeredComponents.map(item => item.key));

for (const connectionId of connections.listConnections()) {
if (!registeredComponentsKeys.has(connectionId)) {
await connections.removeConnection({
flowId: connectionId.split(':')[0],
componentId: connectionId.split(':')[1]
});
}
}
});
};
157 changes: 25 additions & 132 deletions src/appmixer/kafka/platform/NewMessage/NewMessage.js
Original file line number Diff line number Diff line change
@@ -1,144 +1,37 @@
'use strict';

const RegexParser = require('regex-parser');
const { kafka } = require('../../common');

let consumer;
let interval;
let lock;
let timeout;

module.exports = {

async start(context) {

await context.stateSet('ignoreNextTick', true);
start(context) {

const { componentId, flowId, properties: { topics, groupId, fromBeginning } } = context;
return context.callAppmixer({
endPoint: '/plugins/appmixer/kafka/connection',
method: 'POST',
body: {
authDetails: context.auth,
groupId: groupId || `group-${componentId}:${flowId}`,
topics,
fromBeginning,
componentId,
flowId
}
});
},

async tick(context) {

if (await context.stateGet('ignoreNextTick')) {
await context.stateSet('ignoreNextTick', false);
return;
}

try {
lock = await context.lock('KafkaNewMessage-' + context.componentId, {
ttl: 1000 * 60 * 1,
maxRetryCount: 0
});
} catch (err) {
return;
}
return new Promise(async resolve => {

timeout = setTimeout(async () => {
await disconnectConsumerAndReleaseLock(context);
resolve();
}, context.config.timeout || 1000 * 60 * 10);

interval = setInterval(async () => {
// Extend lock if necessary
await lock.extend(parseInt(context.config.lockExtendTime, 10) || 1000 * 60 * 1);
}, context.config.interval || 30000);

consumer = initializeConsumer(context);

const flowStatusCheckInterval = setInterval(async () => {
// to get the `stage` of the flow
try {
const resp = await context.callAppmixer({
endPoint: `/flows/${context.flowId}`,
method: 'GET',
qs: {
projection: 'stage'
}
});

if (resp.stage === 'stopped') {
await disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval);
}
} catch (error) {
// check response code for 404 and disconnet the consumer
if (error.response.status === 404) {
await disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval);
}
}
}, context.config.flowStatusCheckInterval || 1000);
stop(context) {

await runConsumer(context);
return context.callAppmixer({
endPoint: `/plugins/appmixer/kafka/connection/${context.flowId}/${context.componentId}`,
method: 'DELETE'
});
}
};

async function disconnectConsumerAndReleaseLock(context, flowStatusCheckInterval) {

if (consumer) {
await context.log({ message: 'Disconnecting consumer...' });
clearInterval(flowStatusCheckInterval);
await consumer.disconnect();
consumer = null;
clearInterval(interval);
clearTimeout(timeout);
lock && await lock.unlock();
}
}

function initializeConsumer(context) {

const { groupId } = context.properties;
const kafkaMaster = kafka();
kafkaMaster.init(context.auth);
return kafkaMaster.createConsumer({ groupId: groupId || `group-${context.componentId}:${context.flowId}` });
}

async function runConsumer(context) {

const { topics } = context.properties;

const topicSubscriptions = topics.AND.map(topic => {

if (topic.topic.startsWith('/')) {
return RegexParser(topic.topic);
} else {
return topic.topic;
}
});

await consumer.connect();
await consumer.subscribe({ topics: topicSubscriptions, fromBeginning: true });

await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {

for (let message of batch.messages) {

if (!isRunning() || isStale()) {
break;
}
await processMessage(context, message);
resolveOffset(message.offset);
await heartbeat();
}
}
});
}
},

async function processMessage(context, message) {
async receive(context) {

const headers = {};
if (message.headers) {
for (const key of Object.keys(message.headers)) {
const header = message.headers[key];
headers[key] = Buffer.isBuffer(header) ? header.toString('utf8') : (header || '');
if (context.messages.webhook) {
await context.sendJson(context.messages.webhook.content.data, 'out');
return context.response();
}
}

const out = {
key: message.key.toString(),
value: message.value.toString(),
headers: headers
};
await context.sendJson(out, 'out');
}
};
16 changes: 14 additions & 2 deletions src/appmixer/kafka/platform/NewMessage/component.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "appmixer.kafka.platform.NewMessage",
"name": "appmixer.kafka.platform.NewM",
"author": "Harsha Surisetty <[email protected]>",
"description": "Watch new messages in a topic.",
"private": false,
Expand All @@ -14,8 +14,14 @@
"schema": {
"type": "object",
"properties": {
"groupId": {
"type": "string"
},
"topics": {
"type": "object"
},
"fromBeginning": {
"type": "boolean"
}
},
"required": [
Expand Down Expand Up @@ -47,7 +53,13 @@
},
"index": 1,
"tooltip": "Each topic can be added by \"Add\" button. If a topic starts with the / character, it is considered as a regular expression."

},
"fromBeginning": {
"type": "toggle",
"label": "From Beginning",
"index": 2,
"defaultValue": false,
"tooltip": "When 'From Beginning' is true, the group will use the earliest offset. If set to false, it will use the latest offset. The default is false."
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/appmixer/kafka/plugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

module.exports = async context => {
context.log('info', 'Initializing Kafka plugin.');

require('./routes')(context);

context.log('info', 'Scheduling Kafka jobs.');
await require('./jobs')(context);

context.log('info', 'Kafka plugin initialized.');
};
35 changes: 35 additions & 0 deletions src/appmixer/kafka/routes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';
const connections = require('./connections');

module.exports = (context) => {

context.http.router.register({
method: 'POST',
path: '/connection',
options: {
handler: async (req, h) => {

const { flowId, componentId } = req.payload;

await context.service.stateSet(`${flowId}:${componentId}`, req.payload);
await connections.addConnection(context, req.payload);
return h.response({});
}
}
});

context.http.router.register({
method: 'DELETE',
path: '/connection/{flowId}/{componentId}',
options: {
handler: async (req, h) => {

const { flowId, componentId } = req.params;

await context.service.stateUnset(`${flowId}:${componentId}`);
await connections.removeConnection({ flowId, componentId });
return h.response({});
}
}
});
};

0 comments on commit e8f7429

Please sign in to comment.