Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
harshaio committed Apr 16, 2024
1 parent ccb2361 commit 91469af
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/appmixer/kafka/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ module.exports = async (context) => {
const connectComponent = async (component) => {
const connectionId = `${component.flowId}:${component.componentId}`;
if (!existingConnections.includes(connectionId)) {
await context.log('info', `Connecting component: ${connectionId}`);
const latestState = await context.service.stateGet(connectionId);
// Check if the component is still registered
if (latestState) {
await context.log('info', `Connecting component: ${connectionId}`);
await connections.addConnection(context, component.value);
}
}
Expand All @@ -45,10 +45,10 @@ module.exports = async (context) => {
// Disconnect components that are in the existing connections but not in the service state
await Promise.allSettled(existingConnections.map(connectionId => limit(async () => {
if (!registeredComponentsKeys.has(connectionId)) {
await context.log('info', `Disconnecting component: ${connectionId}`);
const latestState = await context.service.stateGet(connectionId);
// Check if the component is still registered
if (latestState) {
await context.log('info', `Disconnecting component: ${connectionId}`);
await connections.removeConnection({
flowId: connectionId.split(':')[0],
componentId: connectionId.split(':')[1]
Expand Down
4 changes: 3 additions & 1 deletion src/appmixer/kafka/platform/SendMessage/SendMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ module.exports = {
flowId: context.flowId
};

return context.callAppmixer({
await context.callAppmixer({

endPoint: '/plugins/appmixer/kafka/connect/producer',
method: 'POST',
body: options
});

return context.sendJson(context.messages.in.content, 'out');
}
};
4 changes: 3 additions & 1 deletion src/appmixer/kafka/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ module.exports = (context) => {
handler: async (req, h) => {

const { flowId, componentId } = req.payload;
await connections.removeConnection({ flowId, componentId }, req.params.mode);
if (req.params.mode === 'consumer') {
await connections.removeConnection({ flowId, componentId }, req.params.mode);
}
await context.service.stateSet(`${flowId}:${componentId}`, req.payload);
await connections.addConnection(context, req.payload, req.params.mode);
return h.response({});
Expand Down

0 comments on commit 91469af

Please sign in to comment.