Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use logger for logging #36

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions example/joinResistance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ subscribeParams: #parameters for subscribing to an event
type: test
acks: []
rebel: "luke"
joinResistance: |
/${(
$details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;
$joined( "/rebelForces/-", $details);
)}
joinResistance: |
/${(
start;
$details := rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0].name;
$joined( "/rebelForces/-", $details);
)}
# starts producer function
send$: $publish(produceParams)
# starts consumer function
Expand Down
182 changes: 21 additions & 161 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export class StatedWorkflow {
this.snapshotOpts = context.snapshot || {storage: 'fs', basePath: './.state'}
this.storage = storage || createStorage(this.snapshotOpts);
this.snapshotManager = new SnapshotManager(this.snapshotOpts, this.storage);

// create metrics provider
this.workflowMetrics = new WorkflowMetrics();

this.consumers = new Map(); //key is type, value is pulsar consumer
this.dispatchers = new Map(); //key is type, value Set of WorkflowDispatcher

Expand All @@ -82,8 +78,6 @@ export class StatedWorkflow {
"onHttp": this.onHttp.bind(this),
"publish": this.publish.bind(this),
"logFunctionInvocation": this.logFunctionInvocation.bind(this),
"workflow": this.workflow.bind(this),
// "recover": this.recover.bind(this),
"sleep": Delay.start,
"ack": this.ack.bind(this),
}
Expand Down Expand Up @@ -138,7 +132,7 @@ export class StatedWorkflow {

//
async ack(data) {
console.log(`acknowledging data: ${StatedREPL.stringify(data)}`);
this.templateProcessor.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
const dispatcherType = this.workflowDispatcher.dispatchers.get(data.type);
for (let t of dispatcherType) {
const dispatcher = this.workflowDispatcher.dispatcherObjects.get(t);
Expand All @@ -156,19 +150,6 @@ export class StatedWorkflow {

}

// setWorkflowPersistence() {
// const storage = new Storage({workflowName: this.templateProcessor.input.name});
// const cbFn = async (data, jsonPtr, removed) => {
// try {
// await storage.persist(this.templateProcessor);
// } catch (error) {
// console.error(`Error persisting workflow state: ${error}`);
// }
// }
// this.templateProcessor.removeDataChangeCallback('/');
// this.templateProcessor.setDataChangeCallback('/',cbFn);
// }

async logFunctionInvocation(stage, args, result, error = null, log) {
const logMessage = {
context: stage.name,
Expand All @@ -186,7 +167,7 @@ export class StatedWorkflow {
logMessage.finish = new Date().toISOString();
logMessage.out = result;
}
console.log(StatedREPL.stringify(logMessage));
this.logger.debug(StatedREPL.stringify(logMessage));

// Assuming 'logs' array is inside 'log' object
if (log.logs) {
Expand Down Expand Up @@ -280,7 +261,7 @@ export class StatedWorkflow {
],
});
} catch (err) {
console.error(`Error publishing to Kafka: ${err}`);
this.logger.error(`Error publishing to Kafka: ${err}`);
} finally {
// Close the producer when done
await producer.disconnect();
Expand Down Expand Up @@ -379,14 +360,11 @@ export class StatedWorkflow {
const messageDataStr = message.getData().toString();
messageData = JSON.parse(messageDataStr);
} catch (error) {
console.error("unable to parse data to json:", error);
this.templateProcessor.logger.error("unable to parse data to json:", error);
// TODO - should we acknowledge the message here?
continue;
}
let resolve;
this.latch = new Promise((_resolve) => {
resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it
});

// create a callback to acknowledge the message
const dataAckCallback = async () => {
Expand All @@ -399,7 +377,7 @@ export class StatedWorkflow {
break;
}
} catch (error) {
console.error("Error receiving or dispatching message:", error);
this.templateProcessor.logger.error("Error receiving or dispatching message:", error);
} finally {
if (this.pulsarClient === undefined) {
break;
Expand All @@ -410,7 +388,7 @@ export class StatedWorkflow {
try {
await consumer.close();
} catch (error) {
console.error("Error closing consumer:", error);
this.templateProcessor.logger.error("Error closing consumer:", error);
}
})();
}
Expand Down Expand Up @@ -458,10 +436,10 @@ export class StatedWorkflow {
try {
data = await registry.decode(message.value);
} catch (error) {
console.error("Unable to parse data to JSON:", error);
this.logger.error("Unable to parse data to JSON:", error);
}
const ackFunction = async (data2ack) => {
console.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`);
this.logger.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`);
// TODO: add ack logic
}
await this.workflowDispatcher.dispatchToAllSubscribers(type, data, ackFunction);
Expand Down Expand Up @@ -510,16 +488,9 @@ export class StatedWorkflow {
const str = message.value.toString();
data = JSON.parse(str);
} catch (error) {
console.error("Unable to parse data to JSON:", error);
}
const ackFunction = async (data) => {
// TODO: make the below code working
// const currentOffset = this.templateProcessor.output(subscribeParamsJsonPointer + 'offset',);
// if (currentOffset < message.offset + 1) {
// await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);
// this.templateProcessor.setData(subscribeParamsJsonPointer + 'offset', message.offset + 1;
// }
this.logger.error("Unable to parse data to JSON:", error);
}

this.workflowDispatcher.dispatchToAllSubscribers(type, data, dataAckCallback);


Expand All @@ -545,7 +516,7 @@ export class StatedWorkflow {
testDataAckFunctionGenerator = (data) => {
return (async () => {
if (Array.isArray(clientParams.acks)) {
console.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
this.logger.debug(`acknowledging data: ${StatedREPL.stringify(data)}`);
await this.templateProcessor.setData(subscribeParamsJsonPointer + '/client/acks/-',data);
}
}).bind(this);
Expand Down Expand Up @@ -582,23 +553,23 @@ export class StatedWorkflow {
if(clientParams.type === "test"){
await this.subscribeTest(subscriptionParams, subscribeParamsJsonPointer);
} else if (clientType === 'dispatcher') {
this.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`);
this.templateProcessor.logger.debug(`No 'real' subscription created because client.type='dispatcher' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`);
this.workflowDispatcher.getDispatcher(subscriptionParams);
} else if (clientType === 'http') {
this.onHttp(subscriptionParams);
} else if (clientType === 'cop') {
this.logger.debug(`subscribing to cop cloud event sources ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to cop cloud event sources ${clientParams}`)
this.subscribeCOPKafka(subscriptionParams);
}else if (clientType === 'kafka') {
this.logger.debug(`subscribing to kafka using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to kafka using ${clientParams}`)
this.createKafkaClient(clientParams);
this.subscribeKafka(subscriptionParams);
}else if(clientType === 'pulsar') {
this.logger.debug(`subscribing to pulsar (default) using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to pulsar (default) using ${clientParams}`)
this.createPulsarClient(clientParams);
this.subscribePulsar(subscriptionParams);
}else if(clientType === 'pulsarMock'){
this.logger.debug(`subscribing to pulsarMock using ${clientParams}`)
this.templateProcessor.logger.debug(`subscribing to pulsarMock using ${clientParams}`)
this.createPulsarClientMock(clientParams);
this.subscribePulsar(subscriptionParams);
}else{
Expand All @@ -613,15 +584,15 @@ export class StatedWorkflow {
this.app = express();
this.app.use(express.json());
this.app.listen(this.port, () => {
console.log(`Server started on http://localhost:${StatedWorkflow.port}`);
this.templateProcessor.logger.log(`Server started on http://localhost:${StatedWorkflow.port}`);
});
// Path = /workflow/:workflowId
// workflowIdToWorkflowDispatcher
if (subscriptionParams.type === undefined) subscriptionParams.type = 'default-type';
if (subscriptionParams.subscriberId === undefined) subscriptionParams.subscriberId = 'default-subscriberId';
const dispatcher = this.workflowDispatcher.getDispatcher(subscriptionParams);
this.app.all('*', async (req, res) => {
console.debug("Received HTTP request: ", req.body, req.method, req.url);
this.templateProcessor.logger.debug("Received HTTP request: ", req.body, req.method, req.url);
// Push the request and response objects to the dispatch queue to be handled by callback
await dispatcher.addToQueue(req.body, ()=>{ res.send("sucess")});
});
Expand All @@ -630,82 +601,6 @@ export class StatedWorkflow {

}

static async deleteStepsLogs(workflowInvocation, steps){
await Promise.all(steps.map(s=>s.deleteLogs(workflowInvocation)));
}

// ensures that the log object has the right structure for the workflow invocation
static initializeLog(log, workflowName, id) {
if (!log[workflowName]) log[workflowName] = {};
if (!log[workflowName][id]) log[workflowName][id] = {
info: {
start: new Date().getTime(),
status: 'in-progress'
},
execution: {}
};
}

static async persistLogRecord(stepRecord) {
this.publish(
{'type': stepRecord.workflowName, 'data': stepRecord},
{type:'pulsar', params: {serviceUrl: 'pulsar://localhost:6650'}}
);
}


async executeStep(step, input, currentLog, stepRecord) {
/*
const stepLog = {
step: step.name,
start: new Date().getTime(),
args: [input]
};

*/

if (currentLog.execution[stepRecord.stepName]?.out) {
console.log(`step ${step.name} has been already executed. Skipping`);
return currentLog.execution[stepRecord.stepName].out;
}
stepRecord["start"] = new Date().getTime();
stepRecord["args"] = input;

// we need to pass invocation id to the step expression
step.workflowInvocation = stepRecord.workflowInvocation;

try {
const result = await step.function.apply(this, [input]);
stepRecord.end = new Date().getTime();
stepRecord.out = result;
currentLog.execution[stepRecord.stepName] = stepRecord;
StatedWorkflow.persistLogRecord(stepRecord);
return result;
} catch (error) {
stepRecord.end = new Date().getTime();
stepRecord.error = {message: error.message};
currentLog.info.status = 'failed';
currentLog.execution[stepRecord.stepName] = stepRecord;
StatedWorkflow.persistLogRecord(stepRecord);
throw error;
}
}
finalizeLog(currentLog) {
currentLog.info.end = new Date().getTime();
if (currentLog.info.status !== 'failed') {
currentLog.info.status = 'succeeded';
}
}

ensureRetention(workflowLogs) {
const maxLogs = 100;
const sortedKeys = Object.keys(workflowLogs).sort((a, b) => workflowLogs[b].info.start - workflowLogs[a].info.start);
while (sortedKeys.length > maxLogs) {
const oldestKey = sortedKeys.pop();
delete workflowLogs[oldestKey];
}
}

static generateUniqueId() {
return `${new Date().getTime()}-${Math.random().toString(36).slice(2, 7)}`;
}
Expand All @@ -719,63 +614,28 @@ export class StatedWorkflow {
return `${dateStr}-${timeInMs}-${randomPart}`;
}

async workflow(input, steps, options={}) {
const {name: workflowName, log} = options;
let {id} = options;

if (log === undefined) {
throw new Error('log is missing from options');
}

if (id === undefined) {
id = StatedWorkflow.generateUniqueId();
options.id = id;
}

StatedWorkflow.initializeLog(log, workflowName, id);

let currentInput = input;
let serialOrdinal = 0;
for (let step of steps) {
const stepRecord = {invocationId: id, workflowName, stepName: step.name, serialOrdinal, branchType:"SERIAL"};
currentInput = await this.executeStep(step, currentInput, log[workflowName][id], stepRecord);
serialOrdinal++;
if (step.next) this.workflow(currentInput, step.next, options);
}

//this.finalizeLog(log[workflowName][id]);
//this.ensureRetention(log[workflowName]);

return currentInput;
}


async close() {

if (this.pulsarClient !== undefined) {
try {
await this.pulsarClient.close();
} catch (error) {
console.error("Error closing pulsar client:", error);
this.logger.error("Error closing pulsar client:", error);
}
this.pulsarClient = undefined;
}

this.templateProcessor.removeDataChangeCallback(this.changeListener);

// TODO: check if consumers can be closed without client
// for (let consumer of StatedWorkflow.consumers.values()) {
// console.log(consumer);
// await consumer.disconnect();
// }

try {
if (this.workflowDispatcher) await this.workflowDispatcher.clear();
if (this.templateProcessor) await this.templateProcessor.close();

} catch (error) {
console.error("Error closing workflow dispatcher:", error);
this.logger.error("Error closing workflow dispatcher:", error);
}
clearInterval(this.snapshotInterval);
await this.workflowMetrics.shutdown();
}
}
Loading
Loading