diff --git a/example/joinResistance.yaml b/example/joinResistance.yaml index 5158af8..5e103e9 100644 --- a/example/joinResistance.yaml +++ b/example/joinResistance.yaml @@ -17,11 +17,12 @@ subscribeParams: #parameters for subscribing to an event type: test acks: [] rebel: "luke" -joinResistance: | - /${( - $details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name; - $joined( "/rebelForces/-", $details); - )} +joinResistance: | + /${( + start; + $details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name; + $joined( "/rebelForces/-", $details); + )} # starts producer function send$: $publish(produceParams) # starts consumer function diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index bdc3a01..4bc6739 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -68,10 +68,6 @@ export class StatedWorkflow { this.snapshotOpts = context.snapshot || {storage: 'fs', basePath: './.state'} this.storage = storage || createStorage(this.snapshotOpts); this.snapshotManager = new SnapshotManager(this.snapshotOpts, this.storage); - - // create metrics provider - this.workflowMetrics = new WorkflowMetrics(); - this.consumers = new Map(); //key is type, value is pulsar consumer this.dispatchers = new Map(); //key is type, value Set of WorkflowDispatcher @@ -82,8 +78,6 @@ export class StatedWorkflow { "onHttp": this.onHttp.bind(this), "publish": this.publish.bind(this), "logFunctionInvocation": this.logFunctionInvocation.bind(this), - "workflow": this.workflow.bind(this), - // "recover": this.recover.bind(this), "sleep": Delay.start, "ack": this.ack.bind(this), } @@ -138,7 +132,7 @@ export class StatedWorkflow { // async ack(data) { - console.log(`acknowledging data: ${StatedREPL.stringify(data)}`); + this.templateProcessor.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`); const dispatcherType = this.workflowDispatcher.dispatchers.get(data.type); for (let t of dispatcherType) { const dispatcher = this.workflowDispatcher.dispatcherObjects.get(t); @@ -156,19 +150,6 @@ export class StatedWorkflow { } - // setWorkflowPersistence() { - // const storage = new Storage({workflowName: this.templateProcessor.input.name}); - // const cbFn = async (data, jsonPtr, removed) => { - // try { - // await storage.persist(this.templateProcessor); - // } catch (error) { - // console.error(`Error persisting workflow state: ${error}`); - // } - // } - // this.templateProcessor.removeDataChangeCallback('/'); - // this.templateProcessor.setDataChangeCallback('/',cbFn); - // } - async logFunctionInvocation(stage, args, result, error = null, log) { const logMessage = { context: stage.name, @@ -186,7 +167,7 @@ export class StatedWorkflow { logMessage.finish = new Date().toISOString(); logMessage.out = result; } - console.log(StatedREPL.stringify(logMessage)); + this.logger.debug(StatedREPL.stringify(logMessage)); // Assuming 'logs' array is inside 'log' object if (log.logs) { @@ -280,7 +261,7 @@ export class StatedWorkflow { ], }); } catch (err) { - console.error(`Error publishing to Kafka: ${err}`); + this.logger.error(`Error publishing to Kafka: ${err}`); } finally { // Close the producer when done await producer.disconnect(); @@ -379,14 +360,11 @@ export class StatedWorkflow { const messageDataStr = message.getData().toString(); messageData = JSON.parse(messageDataStr); } catch (error) { - console.error("unable to parse data to json:", error); + this.templateProcessor.logger.error("unable to parse data to json:", error); // TODO - should we acknowledge the message here? continue; } let resolve; - this.latch = new Promise((_resolve) => { - resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it - }); // create a callback to acknowledge the message const dataAckCallback = async () => { @@ -399,7 +377,7 @@ export class StatedWorkflow { break; } } catch (error) { - console.error("Error receiving or dispatching message:", error); + this.templateProcessor.logger.error("Error receiving or dispatching message:", error); } finally { if (this.pulsarClient === undefined) { break; @@ -410,7 +388,7 @@ export class StatedWorkflow { try { await consumer.close(); } catch (error) { - console.error("Error closing consumer:", error); + this.templateProcessor.logger.error("Error closing consumer:", error); } })(); } @@ -458,10 +436,10 @@ export class StatedWorkflow { try { data = await registry.decode(message.value); } catch (error) { - console.error("Unable to parse data to JSON:", error); + this.logger.error("Unable to parse data to JSON:", error); } const ackFunction = async (data2ack) => { - console.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`); + this.logger.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`); // TODO: add ack logic } await this.workflowDispatcher.dispatchToAllSubscribers(type, data, ackFunction); @@ -510,16 +488,9 @@ export class StatedWorkflow { const str = message.value.toString(); data = JSON.parse(str); } catch (error) { - console.error("Unable to parse data to JSON:", error); - } - const ackFunction = async (data) => { - // TODO: make the below code working - // const currentOffset = this.templateProcessor.output(subscribeParamsJsonPointer + 'offset',); - // if (currentOffset < message.offset + 1) { - // await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]); - // this.templateProcessor.setData(subscribeParamsJsonPointer + 'offset', message.offset + 1; - // } + this.logger.error("Unable to parse data to JSON:", error); } + this.workflowDispatcher.dispatchToAllSubscribers(type, data, dataAckCallback); @@ -545,7 +516,7 @@ export class StatedWorkflow { testDataAckFunctionGenerator = (data) => { return (async () => { if (Array.isArray(clientParams.acks)) { - console.debug(`acknowledging data: ${StatedREPL.stringify(data)}`); + this.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`); await this.templateProcessor.setData(subscribeParamsJsonPointer + '/client/acks/-',data); } }).bind(this); @@ -582,23 +553,23 @@ export class StatedWorkflow { if(clientParams.type === "test"){ await this.subscribeTest(subscriptionParams, subscribeParamsJsonPointer); } else if (clientType === 'dispatcher') { - this.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`); + this.templateProcessor.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`); this.workflowDispatcher.getDispatcher(subscriptionParams); } else if (clientType === 'http') { this.onHttp(subscriptionParams); } else if (clientType === 'cop') { - this.logger.debug(`subscribing to cop cloud event sources ${clientParams}`) + this.templateProcessor.logger.debug(`subscribing to cop cloud event sources ${clientParams}`) this.subscribeCOPKafka(subscriptionParams); }else if (clientType === 'kafka') { - this.logger.debug(`subscribing to kafka using ${clientParams}`) + this.templateProcessor.logger.debug(`subscribing to kafka using ${clientParams}`) this.createKafkaClient(clientParams); this.subscribeKafka(subscriptionParams); }else if(clientType === 'pulsar') { - this.logger.debug(`subscribing to pulsar (default) using ${clientParams}`) + this.templateProcessor.logger.debug(`subscribing to pulsar (default) using ${clientParams}`) this.createPulsarClient(clientParams); this.subscribePulsar(subscriptionParams); }else if(clientType === 'pulsarMock'){ - this.logger.debug(`subscribing to pulsarMock using ${clientParams}`) + this.templateProcessor.logger.debug(`subscribing to pulsarMock using ${clientParams}`) this.createPulsarClientMock(clientParams); this.subscribePulsar(subscriptionParams); }else{ @@ -613,7 +584,7 @@ export class StatedWorkflow { this.app = express(); this.app.use(express.json()); this.app.listen(this.port, () => { - console.log(`Server started on http://localhost:${StatedWorkflow.port}`); + this.templateProcessor.logger.log(`Server started on http://localhost:${StatedWorkflow.port}`); }); // Path = /workflow/:workflowId // workflowIdToWorkflowDispatcher @@ -621,7 +592,7 @@ export class StatedWorkflow { if (subscriptionParams.subscriberId === undefined) subscriptionParams.subscriberId = 'default-subscriberId'; const dispatcher = this.workflowDispatcher.getDispatcher(subscriptionParams); this.app.all('*', async (req, res) => { - console.debug("Received HTTP request: ", req.body, req.method, req.url); + this.templateProcessor.logger.debug("Received HTTP request: ", req.body, req.method, req.url); // Push the request and response objects to the dispatch queue to be handled by callback await dispatcher.addToQueue(req.body, ()=>{ res.send("sucess")}); }); @@ -630,82 +601,6 @@ export class StatedWorkflow { } - static async deleteStepsLogs(workflowInvocation, steps){ - await Promise.all(steps.map(s=>s.deleteLogs(workflowInvocation))); - } - - // ensures that the log object has the right structure for the workflow invocation - static initializeLog(log, workflowName, id) { - if (!log[workflowName]) log[workflowName] = {}; - if (!log[workflowName][id]) log[workflowName][id] = { - info: { - start: new Date().getTime(), - status: 'in-progress' - }, - execution: {} - }; - } - - static async persistLogRecord(stepRecord) { - this.publish( - {'type': stepRecord.workflowName, 'data': stepRecord}, - {type:'pulsar', params: {serviceUrl: 'pulsar://localhost:6650'}} - ); - } - - - async executeStep(step, input, currentLog, stepRecord) { - /* - const stepLog = { - step: step.name, - start: new Date().getTime(), - args: [input] - }; - - */ - - if (currentLog.execution[stepRecord.stepName]?.out) { - console.log(`step ${step.name} has been already executed. Skipping`); - return currentLog.execution[stepRecord.stepName].out; - } - stepRecord["start"] = new Date().getTime(); - stepRecord["args"] = input; - - // we need to pass invocation id to the step expression - step.workflowInvocation = stepRecord.workflowInvocation; - - try { - const result = await step.function.apply(this, [input]); - stepRecord.end = new Date().getTime(); - stepRecord.out = result; - currentLog.execution[stepRecord.stepName] = stepRecord; - StatedWorkflow.persistLogRecord(stepRecord); - return result; - } catch (error) { - stepRecord.end = new Date().getTime(); - stepRecord.error = {message: error.message}; - currentLog.info.status = 'failed'; - currentLog.execution[stepRecord.stepName] = stepRecord; - StatedWorkflow.persistLogRecord(stepRecord); - throw error; - } - } - finalizeLog(currentLog) { - currentLog.info.end = new Date().getTime(); - if (currentLog.info.status !== 'failed') { - currentLog.info.status = 'succeeded'; - } - } - - ensureRetention(workflowLogs) { - const maxLogs = 100; - const sortedKeys = Object.keys(workflowLogs).sort((a, b) => workflowLogs[b].info.start - workflowLogs[a].info.start); - while (sortedKeys.length > maxLogs) { - const oldestKey = sortedKeys.pop(); - delete workflowLogs[oldestKey]; - } - } - static generateUniqueId() { return `${new Date().getTime()}-${Math.random().toString(36).slice(2, 7)}`; } @@ -719,44 +614,13 @@ export class StatedWorkflow { return `${dateStr}-${timeInMs}-${randomPart}`; } - async workflow(input, steps, options={}) { - const {name: workflowName, log} = options; - let {id} = options; - - if (log === undefined) { - throw new Error('log is missing from options'); - } - - if (id === undefined) { - id = StatedWorkflow.generateUniqueId(); - options.id = id; - } - - StatedWorkflow.initializeLog(log, workflowName, id); - - let currentInput = input; - let serialOrdinal = 0; - for (let step of steps) { - const stepRecord = {invocationId: id, workflowName, stepName: step.name, serialOrdinal, branchType:"SERIAL"}; - currentInput = await this.executeStep(step, currentInput, log[workflowName][id], stepRecord); - serialOrdinal++; - if (step.next) this.workflow(currentInput, step.next, options); - } - - //this.finalizeLog(log[workflowName][id]); - //this.ensureRetention(log[workflowName]); - - return currentInput; - } - - async close() { if (this.pulsarClient !== undefined) { try { await this.pulsarClient.close(); } catch (error) { - console.error("Error closing pulsar client:", error); + this.logger.error("Error closing pulsar client:", error); } this.pulsarClient = undefined; } @@ -764,18 +628,14 @@ export class StatedWorkflow { this.templateProcessor.removeDataChangeCallback(this.changeListener); // TODO: check if consumers can be closed without client - // for (let consumer of StatedWorkflow.consumers.values()) { - // console.log(consumer); - // await consumer.disconnect(); - // } + try { if (this.workflowDispatcher) await this.workflowDispatcher.clear(); if (this.templateProcessor) await this.templateProcessor.close(); } catch (error) { - console.error("Error closing workflow dispatcher:", error); + this.logger.error("Error closing workflow dispatcher:", error); } clearInterval(this.snapshotInterval); - await this.workflowMetrics.shutdown(); } } diff --git a/src/workflow/obsolete/StatedWorkflowFuncs.js b/src/workflow/obsolete/StatedWorkflowFuncs.js index 44834c7..f193ba0 100644 --- a/src/workflow/obsolete/StatedWorkflowFuncs.js +++ b/src/workflow/obsolete/StatedWorkflowFuncs.js @@ -25,7 +25,7 @@ export class StatedWorkflowFuncs { let workflowStart = new Date().getTime(); if (workflowInvocation === undefined) { - workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID(); + workflowInvocation = generateDateAndTimeBasedID(); } if (input === '__recover__' && stepJsons?.[0]) { @@ -53,7 +53,7 @@ export class StatedWorkflowFuncs { this.workflowMetrics.workflowInvocationLatency.record(new Date().getTime() - workflowStart, {workflowInvocation}); //we do not need to await this. Deletion can happen async - if (!tp.options.keepLogs) StatedWorkflow.deleteStepsLogs(workflowInvocation, steps) + if (!tp.options.keepLogs) deleteStepsLogs(workflowInvocation, steps) .catch(e=>this.templateProcessor.logger.error(`failed to delete completed log with invocation id '${workflowInvocation}'`)); return currentInput; @@ -76,7 +76,7 @@ export class StatedWorkflowFuncs { let {workflowInvocation} = context; if (workflowInvocation === undefined) { - workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID(); + workflowInvocation = generateDateAndTimeBasedID(); } let promises = []; @@ -96,7 +96,7 @@ export class StatedWorkflowFuncs { let result = await Promise.all(promises); - if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps); + if (!tp.options.keepLogs) await deleteStepsLogs(workflowInvocation, steps); return result; } @@ -136,4 +136,115 @@ export class StatedWorkflowFuncs { throw new Error(`unknown courseOfAction: ${instruction}`); } } + + async workflow(input, steps, options={}) { + const {name: workflowName, log} = options; + let {id} = options; + + if (log === undefined) { + throw new Error('log is missing from options'); + } + + if (id === undefined) { + id = generateUniqueId(); + options.id = id; + } + + initializeLog(log, workflowName, id); + + let currentInput = input; + let serialOrdinal = 0; + for (let step of steps) { + const stepRecord = {invocationId: id, workflowName, stepName: step.name, serialOrdinal, branchType:"SERIAL"}; + currentInput = await this.executeStep(step, currentInput, log[workflowName][id], stepRecord); + serialOrdinal++; + if (step.next) this.workflow(currentInput, step.next, options); + } + + //this.finalizeLog(log[workflowName][id]); + //this.ensureRetention(log[workflowName]); + + return currentInput; + } + + + static async deleteStepsLogs(workflowInvocation, steps){ + await Promise.all(steps.map(s=>s.deleteLogs(workflowInvocation))); + } + + + // ensures that the log object has the right structure for the workflow invocation + static initializeLog(log, workflowName, id) { + if (!log[workflowName]) log[workflowName] = {}; + if (!log[workflowName][id]) log[workflowName][id] = { + info: { + start: new Date().getTime(), + status: 'in-progress' + }, + execution: {} + }; + } + + + + async executeStep(step, input, currentLog, stepRecord) { + /* + const stepLog = { + step: step.name, + start: new Date().getTime(), + args: [input] + }; + + */ + + if (currentLog.execution[stepRecord.stepName]?.out) { + console.log(`step ${step.name} has been already executed. Skipping`); + return currentLog.execution[stepRecord.stepName].out; + } + stepRecord["start"] = new Date().getTime(); + stepRecord["args"] = input; + + // we need to pass invocation id to the step expression + step.workflowInvocation = stepRecord.workflowInvocation; + + try { + const result = await step.function.apply(this, [input]); + stepRecord.end = new Date().getTime(); + stepRecord.out = result; + currentLog.execution[stepRecord.stepName] = stepRecord; + persistLogRecord(stepRecord); + return result; + } catch (error) { + stepRecord.end = new Date().getTime(); + stepRecord.error = {message: error.message}; + currentLog.info.status = 'failed'; + currentLog.execution[stepRecord.stepName] = stepRecord; + persistLogRecord(stepRecord); + throw error; + } + } + finalizeLog(currentLog) { + currentLog.info.end = new Date().getTime(); + if (currentLog.info.status !== 'failed') { + currentLog.info.status = 'succeeded'; + } + } + + ensureRetention(workflowLogs) { + const maxLogs = 100; + const sortedKeys = Object.keys(workflowLogs).sort((a, b) => workflowLogs[b].info.start - workflowLogs[a].info.start); + while (sortedKeys.length > maxLogs) { + const oldestKey = sortedKeys.pop(); + delete workflowLogs[oldestKey]; + } + } + + static async persistLogRecord(stepRecord) { + this.publish( + {'type': stepRecord.workflowName, 'data': stepRecord}, + {type:'pulsar', params: {serviceUrl: 'pulsar://localhost:6650'}} + ); + } + + } \ No newline at end of file