Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message listener #3

Open
wants to merge 4 commits into
base: add_evaluate_library
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export function initializeCqlWorker(cqlWorker, isNodeJs=false) {
// Unpack the message from the incoming event.
let expression = isNodeJs ? event.expression : event.data.expression;
let result = isNodeJs ? event.result : event.data.result;
let messages = isNodeJs ? event.messages : event.data.messages;

// If the response is that cqlWorker is still waiting on the patient bundle,
// wait 100 ms and resend.
Expand All @@ -40,7 +41,7 @@ export function initializeCqlWorker(cqlWorker, isNodeJs=false) {
// If the expression was found in the messageArray
if (executingExpressionIndex != -1) {
// Return the result by resolving the promise
messageArray[executingExpressionIndex].resolver(result);
messageArray[executingExpressionIndex].resolver({result, messages});
// Remove the matching entry from the array
messageArray.splice(executingExpressionIndex,1);
}
Expand Down Expand Up @@ -70,9 +71,10 @@ export function initializeCqlWorker(cqlWorker, isNodeJs=false) {
/**
* Sends an expression to the webworker for evaluation.
* @param {string} expression - The name of a CQL expression.
* @param {string} executionDateTime - Optional, ISO formatted execution datetime if not now
* @returns {boolean} - A dummy return value.
*/
const evaluateExpression = async function(expression) {
const evaluateExpression = async function(expression, executionDateTime = undefined) {
// If this expression is already on the message stack, return its index.
let executingExpressionIndex = messageArray.map((msg,idx) => {
if (msg.expression == expression) return idx;
Expand All @@ -92,7 +94,7 @@ export function initializeCqlWorker(cqlWorker, isNodeJs=false) {
});

// Send the entry to the Web Worker
cqlWorker.postMessage({expression: expression});
cqlWorker.postMessage({expression: expression, executionDateTime: executionDateTime});
// Return a promise that can be resolved after the web worker returns the result
return new Promise(resolve => messageArray[n-1].resolver = resolve);
} else {
Expand All @@ -104,8 +106,8 @@ export function initializeCqlWorker(cqlWorker, isNodeJs=false) {
* Sends a command to the webworker for to evaluate the entire library.
* @returns {boolean} - A dummy return value.
*/
const evaluateLibrary = async function() {
return evaluateExpression('__evaluate_library__');
const evaluateLibrary = async function(executionDateTime = undefined) {
return evaluateExpression('__evaluate_library__', executionDateTime);
};

return [
Expand Down
34 changes: 23 additions & 11 deletions src/CqlProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export default class CqlProcessor {
elmJson,
valueSetJson,
parameters = null,
elmJsonDependencies = {}
elmJsonDependencies = {},
messageListener = undefined
) {
this.patientSource = fhir.PatientSource.FHIRv401();
this.repository = new cql.Repository({
Expand All @@ -27,10 +28,12 @@ export default class CqlProcessor {
});
this.library = new cql.Library(elmJson, this.repository);
this.codeService = new cql.CodeService(valueSetJson);
this.messageListener = messageListener;
this.executor = new cql.Executor(
this.library,
this.codeService,
parameters
parameters,
this.messageListener
);
}

Expand All @@ -50,20 +53,29 @@ export default class CqlProcessor {
* Evaluate an expression from the CQL library represented by elmJson against
* the patient bundle.
* @param {string} expr - The name of an expression from elmJson
* @param {string} executionDateTime - Optional, ISO formatted execution datetime if not now
* @returns {object} results - The results from executing the expression
*/
async evaluateExpression(expr) {
async evaluateExpression(expr, executionDateTime = undefined) {
// Only try to evaluate an expression if we have a patient bundle loaded.
if (this.patientSource._bundles && this.patientSource._bundles.length > 0) {
let results;
if (expr == "__evaluate_library__") {
results = await this.executor.exec(this.patientSource);
results = executionDateTime != undefined ?
await this.executor.exec(this.patientSource, cql.DateTime.parse(executionDateTime)):
await this.executor.exec(this.patientSource);
return results.patientResults[this.patientID];
} else {
results = await this.executor.exec_expression(
expr,
this.patientSource
);
} else {
results = executionDateTime != undefined ?
await this.executor.exec_expression(
expr,
this.patientSource,
cql.DateTime.parse(executionDateTime)
):
await this.executor.exec_expression(
expr,
this.patientSource
);
this.patientSource._index = 0; // HACK: rewind the patient source
return results.patientResults[this.patientID][expr];
}
Expand All @@ -75,7 +87,7 @@ export default class CqlProcessor {
* the patient bundle.
* @returns {object} results - The results from executing the expression
*/
async evaluateLibrary() {
return this.evaluateExpression("__evaluate_library__");
async evaluateLibrary(executionDateTime = undefined) {
return this.evaluateExpression("__evaluate_library__", executionDateTime);
}
}
15 changes: 12 additions & 3 deletions src/cql-worker-thread.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { parentPort } from "worker_threads";
import CqlProcessor from "./CqlProcessor.js";
import { MessageListener } from './messageListener.js'
var processor = {};
var messageListener = undefined;

/**
* TODO: Update this comment for node worker thread.
Expand All @@ -26,16 +28,21 @@ parentPort.onmessage = async function (rx) {
// the type called most often.
if ((expression = rx.data.expression) != null) {
let tx, result;
let executionDateTime = rx.data.executionDateTime === null ? undefined : rx.data.executionDateTime;
if (processor.patientSource._bundles.length > 0) {
if (expression == "__evaluate_library__") {
result = await processor.evaluateLibrary();
result = await processor.evaluateLibrary(executionDateTime);
} else {
result = await processor.evaluateExpression(expression);
result = await processor.evaluateExpression(expression, executionDateTime);
}
tx = {
expression: expression,
result: result,
};
if(messageListener){
tx.messages = messageListener.messages;
messageListener.messages = [];
}
} else {
// If we don't have a bundle just send the expression back.
tx = {
Expand All @@ -56,11 +63,13 @@ parentPort.onmessage = async function (rx) {
// CQL Processor object.
parameters = rx.data.parameters;
elmJsonDependencies = rx.data.elmJsonDependencies;
messageListener = new MessageListener();
processor = new CqlProcessor(
elmJson,
valueSetJson,
parameters,
elmJsonDependencies
elmJsonDependencies,
messageListener
);
}
};
14 changes: 11 additions & 3 deletions src/cql.worker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

import CqlProcessor from './CqlProcessor.js';
import { MessageListener } from './messageListener.js'
var processor = {};
var messageListener = undefined;

/**
* Define an event handler for when a message is sent to this web worker.
Expand All @@ -27,16 +29,21 @@ onmessage = async function(rx) {
if ((expression = rx.data.expression) != null) {

let tx,result;
let executionDateTime = rx.data.executionDateTime === null ? undefined : rx.data.executionDateTime;
if (processor.patientSource._bundles.length > 0) {
if(expression == '__evaluate_library__'){
result = await processor.evaluateLibrary();
result = await processor.evaluateLibrary(executionDateTime);
}else{
result = await processor.evaluateExpression(expression);
result = await processor.evaluateExpression(expression, executionDateTime);
}
tx = {
expression: expression,
result: result
};
if(messageListener){
tx.messages = messageListener.messages;
messageListener.messages = [];
}
} else {
// If we don't have a bundle just send the expression back.
tx = {
Expand All @@ -53,7 +60,8 @@ onmessage = async function(rx) {
// CQL Processor object.
parameters = rx.data.parameters;
elmJsonDependencies = rx.data.elmJsonDependencies;
processor = new CqlProcessor(elmJson, valueSetJson, parameters, elmJsonDependencies);
messageListener = new MessageListener();
processor = new CqlProcessor(elmJson, valueSetJson, parameters, elmJsonDependencies, messageListener);
}
}

24 changes: 24 additions & 0 deletions src/messageListener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export class MessageListener {
constructor() {
// Internal array to store all message objects
this.messages = [];
}

/**
* cql-execution Message Listener implementation
* @param {Any} source - Source object returned unmodified by the Message operation
* @param {String} code - Token that is the coded representation of the error
* @param {String} severity - Token Trace | Message | Warning | Error
* @param {String} message - content of the actual message that is sent to the calling environment
* @returns {}
*/
onMessage(source, code, severity, message) {
// Add the message object to the internal array
this.messages.push({
source: JSON.stringify(source),
code: code,
severity: severity,
message: message
});
}
}