Skip to content

Commit

Permalink
Merge pull request #41 from jembi/add-orchestrations
Browse files Browse the repository at this point in the history
Add orchestrations
  • Loading branch information
bradsawadye authored May 21, 2024
2 parents b932896 + c6c9c85 commit 1e311b2
Show file tree
Hide file tree
Showing 27 changed files with 429 additions and 140 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mpi-mediator",
"version": "v2.1.1",
"version": "v2.3.0",
"description": "An OpenHIM mediator to handle all interactions with an MPI component",
"main": "index.ts",
"scripts": {
Expand Down
10 changes: 6 additions & 4 deletions src/middlewares/mpi-mdm-everything.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { RequestHandler } from 'express';
import logger from '../logger';
import { buildOpenhimResponseObject } from '../utils/utils';
import { MpiMediatorResponseObject } from '../types/response';
import { MpiMediatorResponseObject, Orchestration } from '../types/response';
import { fetchMpiPatientLinks } from '../utils/mpi';
import { fetchAllPatientResourcesByRefs } from '../routes/handlers/fetchPatientResources';

Expand All @@ -11,6 +11,8 @@ import { fetchAllPatientResourcesByRefs } from '../routes/handlers/fetchPatientR
const fetchAllLinkedPatientResources = async (
patientId: string
): Promise<MpiMediatorResponseObject> => {
const orchestrations: Orchestration[] = [];

try {
const patientRef = `Patient/${patientId}`;
const patientRefs: string[] = [];
Expand All @@ -19,18 +21,18 @@ const fetchAllLinkedPatientResources = async (
await fetchMpiPatientLinks(patientRef, patientRefs);

// Perform requests to HAPI FHIR to get everything for each patient ref
const bundle = await fetchAllPatientResourcesByRefs(patientRefs);
const bundle = await fetchAllPatientResourcesByRefs(patientRefs, orchestrations);

return {
status: 200,
body: buildOpenhimResponseObject('Success', 200, bundle),
body: buildOpenhimResponseObject('Successful', 200, bundle, 'application/fhir+json', orchestrations),
};
} catch (e) {
logger.error('Unable to fetch all linked patient resources (MDM expansion)', e);

return {
status: 500,
body: buildOpenhimResponseObject('Failed', 500, e as Error),
body: buildOpenhimResponseObject('Failed', 500, e as Error, 'application/fhir+json', orchestrations),
};
}
};
Expand Down
9 changes: 6 additions & 3 deletions src/middlewares/mpi-mdm-summary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ import logger from '../logger';
import { fetchMpiPatientLinks } from '../utils/mpi';
import { buildOpenhimResponseObject } from '../utils/utils';
import { fetchAllPatientSummariesByRefs } from '../routes/handlers/fetchPatientSummaries';
import { Orchestration } from '../types/response';

const fetchAllLinkedPatientSummary = async (patientId: string, params: object) => {
const orchestrations: Orchestration[] = [];

try {
const patientRef = `Patient/${patientId}`;
const patientRefs: string[] = [];

await fetchMpiPatientLinks(patientRef, patientRefs);

const bundle = await fetchAllPatientSummariesByRefs(patientRefs, params);
const bundle = await fetchAllPatientSummariesByRefs(patientRefs, params, orchestrations);

logger.debug(`Fetched all patient summaries from the MPI: ${bundle}`);

return {
status: 200,
body: buildOpenhimResponseObject('Success', 200, bundle),
body: buildOpenhimResponseObject('Successful', 200, bundle, 'application/fhir+json', orchestrations),
};
} catch (e) {
logger.error('Unable to fetch all linked patient resources (MDM expansion)', e);

return {
status: 500,
body: buildOpenhimResponseObject('Failed', 500, e as Error),
body: buildOpenhimResponseObject('Failed', 500, e as Error, 'application/fhir+json', orchestrations),
};
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/openhim-proxy-interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const openhimProxyResponseInterceptor = responseInterceptor(

if (isHttpStatusOk(statusCode)) {
logger.info('Successfully proxied request!');
transactionStatus = 'Success';
transactionStatus = 'Successful';
} else {
logger.error(`Error in validating: ${JSON.stringify(body)}!`);
transactionStatus = 'Failed';
Expand Down
2 changes: 1 addition & 1 deletion src/middlewares/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const validationMiddleware: RequestHandler = async (req, res, next) => {
{ 'Content-Type': 'application/fhir+json' }
);

transactionStatus = 'Success';
transactionStatus = 'Successful';

if (isHttpStatusOk(response.status)) {
logger.info('Successfully validated bundle!');
Expand Down
2 changes: 1 addition & 1 deletion src/openhim/mediatorConfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"urn": "urn:mediator:mpi-mediator",
"version": "v2.0.0",
"version": "v2.3.0",
"name": "MPI mediator",
"description": "A mediator handling interactions between the OpenHIM Core service, Sante MPI, Hapi-FHIR, and Kafka",
"defaultChannelConfig": [
Expand Down
138 changes: 85 additions & 53 deletions src/routes/handlers/fetchPatient.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,52 @@
import { Bundle } from 'fhir/r3';
import format from 'date-fns/format';

import { getConfig } from '../../config/config';
import logger from '../../logger';
import { MpiMediatorResponseObject } from '../../types/response';
import { MpiMediatorResponseObject, Orchestration } from '../../types/response';
import { getMpiAuthToken } from '../../utils/mpi';
import {
getData,
isHttpStatusOk,
createNewPatientRef,
patientProjector,
createHandlerResponseObject,
createClientRegistryOrcherstation,
createFhirDatastoreOrcherstation,
} from '../../utils/utils';
import { Patient } from 'fhir/r3';

const {
fhirDatastoreProtocol: fhirProtocol,
fhirDatastoreHost: fhirHost,
fhirDatastorePort: fhirPort,
mpiProtocol: mpiProtocol,
mpiHost: mpiHost,
mpiPort: mpiPort,
mpiProtocol,
mpiHost,
mpiPort,
mpiAuthEnabled,
} = getConfig();

const headers: HeadersInit = {
'Content-Type': 'application/fhir+json',
};

export const fetchPatientByQuery = async (
query: object
): Promise<MpiMediatorResponseObject> => {
const combinedParams = new URLSearchParams(query as Record<string, string>).toString();

const headers: HeadersInit = {
'Content-Type': 'application/fhir+json',
};
const orchestrations: Orchestration[] = [];

if (mpiAuthEnabled) {
const token = await getMpiAuthToken();

headers['Authorization'] = `Bearer ${token.accessToken}`;
}

const path = `/fhir/Patient?${combinedParams}`;

orchestrations.push(createClientRegistryOrcherstation('Match by query',path));

const mpiResponse = await getData(
mpiProtocol,
mpiHost,
Expand All @@ -45,52 +55,66 @@ export const fetchPatientByQuery = async (
headers
);

orchestrations[0].response.status = mpiResponse.status;
orchestrations[0].response.body = JSON.stringify(mpiResponse.body);
orchestrations[0].response.timestamp = format(new Date(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");

const promises: any[] = [];

if (isHttpStatusOk(mpiResponse.status)) {
const bundle = mpiResponse.body as Bundle;

logger.debug(`Adding patient link FHIR store`);

addPatientLinks(promises, bundle);
addPatientLinks(promises, bundle, orchestrations);
} else {
return createHandlerResponseObject('Failed', mpiResponse);
return createHandlerResponseObject('Failed', mpiResponse, orchestrations);
}

try {
const entries = await Promise.all(promises);

return createHandlerResponseObject('Successful', {
status: 200,
body: {
resourceType: 'Bundle',
id: combinedParams,
type: 'searchset',
total: entries.length,
entries: entries,
return createHandlerResponseObject(
'Successful',
{
status: 200,
body: {
resourceType: 'Bundle',
id: combinedParams,
type: 'searchset',
total: entries.length,
entries: entries,
},
},
});
orchestrations
);
} catch (err) {
const status = (err as any).status || 500;
const body = (err as any).body || {};

logger.error('Failed to retrieve patient ', body);

return createHandlerResponseObject('Failed', { status, body });
return createHandlerResponseObject('Failed', { status, body }, orchestrations);
}
};

export const fetchPatientById = async (
requestedId: string,
projection: string
): Promise<MpiMediatorResponseObject> => {
const fhirResponse = await getData(
fhirProtocol,
fhirHost,
fhirPort,
`/fhir/Patient/${requestedId}`,
{}
);
const headers: HeadersInit = {
'Content-Type': 'application/fhir+json',
};
const orchestrations: Orchestration[] = [];
let path: string = `/fhir/Patient/${requestedId}`;

orchestrations.push(createFhirDatastoreOrcherstation('Get gutted patient', path));

const fhirResponse = await getData(fhirProtocol, fhirHost, fhirPort, path, headers);

orchestrations[0].response.status = fhirResponse.status;
orchestrations[0].response.body = JSON.stringify(fhirResponse.body);
orchestrations[0].response.timestamp = format(new Date(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");

let upstreamId = requestedId;

Expand All @@ -104,28 +128,25 @@ export const fetchPatientById = async (
logger.debug(`Swapping source ID ${requestedId} for interaction ID ${upstreamId}`);
}
} else {
return createHandlerResponseObject('Failed', fhirResponse);
return createHandlerResponseObject('Failed', fhirResponse, orchestrations);
}

logger.debug(`Fetching patient ${upstreamId} from MPI`);

const headers: HeadersInit = {
'Content-Type': 'application/fhir+json',
};

if (mpiAuthEnabled) {
const token = await getMpiAuthToken();

headers['Authorization'] = `Bearer ${token.accessToken}`;
}

const mpiResponse = await getData(
mpiProtocol,
mpiHost,
mpiPort,
`/fhir/links/Patient/${upstreamId}`,
headers
);
path = `/fhir/links/Patient/${upstreamId}`;
orchestrations.push(createClientRegistryOrcherstation('Get full patient', path));

const mpiResponse = await getData(mpiProtocol, mpiHost, mpiPort, path, headers);

orchestrations[1].response.status = mpiResponse.status;
orchestrations[1].response.body = JSON.stringify(mpiResponse.body);
orchestrations[1].response.timestamp = format(new Date(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");

let transactionStatus = 'Successful';

Expand All @@ -144,19 +165,26 @@ export const fetchPatientById = async (
transactionStatus = 'Failed';
}

return createHandlerResponseObject(transactionStatus, mpiResponse);
return createHandlerResponseObject(transactionStatus, mpiResponse, orchestrations);
};

const addPatientLinks = (promises: any[], bundle: Bundle): void => {
const addPatientLinks = (
promises: any[],
bundle: Bundle,
orchestrations: Orchestration[]
): void => {
bundle.entry?.forEach((patient, index) => {
const path = `/fhir/Patient/${encodeURIComponent(patient.resource?.id || '')}`;

const promise = new Promise(async (resolve, reject) => {
const mpiLinksResponse = await getData(
mpiProtocol,
mpiHost,
mpiPort,
`/fhir/Patient/${encodeURIComponent(patient.resource?.id || '')}`,
{}
);
const orchestration: Orchestration = createClientRegistryOrcherstation('adding patient links', path);

const mpiLinksResponse = await getData(mpiProtocol, mpiHost, mpiPort, path, {});

orchestration.response.status = mpiLinksResponse.status;
orchestration.response.body = JSON.stringify(mpiLinksResponse.body);
orchestration.response.timestamp = format(new Date(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
orchestrations.push(orchestration);

if (isHttpStatusOk(mpiLinksResponse.status)) {
const patient = mpiLinksResponse.body as Patient;
Expand All @@ -165,13 +193,17 @@ const addPatientLinks = (promises: any[], bundle: Bundle): void => {
createNewPatientRef(link.other.reference?.split('/').pop() || '')
) || [];

const fhirResponse = await getData(
fhirProtocol,
fhirHost,
fhirPort,
`/fhir/Patient?link=${encodeURIComponent(links.join(','))}`,
{}
const path: string = `/fhir/Patient?link=${encodeURIComponent(links.join(','))}`;
const orchestration: Orchestration = createFhirDatastoreOrcherstation(
'adding patient links',
path
);
const fhirResponse = await getData(fhirProtocol, fhirHost, fhirPort, path, {});

orchestration.response.status = fhirResponse.status;
orchestration.response.body = JSON.stringify(fhirResponse.body);
orchestration.response.timestamp = format(new Date(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
orchestrations.push(orchestration);

if (!isHttpStatusOk(fhirResponse.status)) {
reject(fhirResponse);
Expand Down
Loading

0 comments on commit 1e311b2

Please sign in to comment.