Skip to content

Commit

Permalink
feat: integrate historic disease data extraction and storage in Click…
Browse files Browse the repository at this point in the history
…House
  • Loading branch information
brett-onions committed Mar 4, 2025
1 parent 422b45e commit 7fe00b1
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 44 deletions.
94 changes: 55 additions & 39 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import express from 'express';
import multer from 'multer';
import { getConfig } from '../config/config';
import { getCsvHeaders, validateBucketName } from '../utils/file-validators';
import {
extractHistoricData,
getCsvHeaders,
validateBucketName,
} from '../utils/file-validators';
import logger from '../logger';
import fs from 'fs/promises';
import path from 'path';
Expand All @@ -14,9 +18,10 @@ import {
} from '../utils/minioClient';
import { registerBucket } from '../openhim/openhim';
import axios from 'axios';
import FormData from 'form-data'
import FormData from 'form-data';
import { createClient } from '@clickhouse/client';
import { ModelPredictionUsingChap } from '../services/ModelPredictionUsingChap';
import { createHistoricalDiseaseTable, insertHistoricDiseaseData } from '../utils/clickhouse';

// Constants
const VALID_MIME_TYPES = ['text/csv', 'application/json'] as const;
Expand Down Expand Up @@ -73,7 +78,7 @@ const validateJsonFile = (buffer: Buffer): boolean => {
// File handlers
const handleCsvFile = async (
files: Express.Multer.File[],
bucket: string,
bucket: string
): Promise<UploadResponse> => {
try {
for (const file of files) {
Expand All @@ -100,7 +105,7 @@ const handleCsvFile = async (
const handleJsonFile = async (
file: Express.Multer.File,
bucket: string,
region: string,
region: string
): Promise<UploadResponse> => {
if (!validateJsonFile(file.buffer)) {
return createErrorResponse('INVALID_JSON_FORMAT', 'Invalid JSON file format');
Expand All @@ -126,15 +131,19 @@ const handleJsonFile = async (
}
};

const handleJsonPayload = async (file: Express.Multer.File, json: Object, bucket: string): Promise<UploadResponse> => {
const handleJsonPayload = async (
file: Express.Multer.File,
json: Object,
bucket: string
): Promise<UploadResponse> => {
try {
const uploadResult = await uploadFileBufferToMinio(
Buffer.from(JSON.stringify(json)),
file.originalname,
bucket,
file.mimetype
);

return uploadResult.success
? createSuccessResponse('UPLOAD_SUCCESS', uploadResult.message)
: createErrorResponse('UPLOAD_FAILED', uploadResult.message);
Expand Down Expand Up @@ -236,18 +245,13 @@ routes.post('/upload', async (req, res) => {
await ensureBucketExists(bucket, createBucketIfNotExists);
getPrediction(trainingFileFormData, historicFutureFormData, bucket);



if (createBucketIfNotExists && getConfig().runningMode !== 'testing') {
await registerBucket(bucket);
}

let response: UploadResponse;
if (trainingFile.mimetype === 'text/csv') {
response = await handleCsvFile(
[trainingFile, historicFile, futureFile],
bucket
);
response = await handleCsvFile([trainingFile, historicFile, futureFile], bucket);
} else {
response = createErrorResponse('INVALID_FILE_TYPE', 'Invalid file type');
}
Expand All @@ -274,43 +278,45 @@ routes.post('/upload', async (req, res) => {
});
});

async function getPrediction(trainingFileFormData: FormData, historicFutureFormData: FormData, bucket: string) {
async function getPrediction(
trainingFileFormData: FormData,
historicFutureFormData: FormData,
bucket: string
) {
try {
const { chapApiUrl } = getConfig();
const { url, password } = getConfig().clickhouse;
const client = createClient({
url,
password,
});
url,
password,
});

const trainingResults = await axios.post(chapApiUrl + '/train', trainingFileFormData, {
const trainingResults = await axios.post(chapApiUrl + '/train', trainingFileFormData, {
headers: {
...trainingFileFormData.getHeaders()
...trainingFileFormData.getHeaders(),
},
})
});

logger.debug(`CHAP Training Results: ${trainingResults.status === 201 ? 'Upload Successful':'Upload Failed'}`)
logger.debug(
`CHAP Training Results: ${trainingResults.status === 201 ? 'Upload Successful' : 'Upload Failed'}`
);

const prediction = await axios.post(chapApiUrl + '/predict', historicFutureFormData, {
headers: {
...historicFutureFormData.getHeaders()
...historicFutureFormData.getHeaders(),
},
})

logger.debug(`CHAP Prediction Results: ${prediction.status === 201 ? 'Successful Received Prediction':'Failed to Received Prediction'}`);
const { predictions } = prediction.data
});

logger.debug(
`CHAP Prediction Results: ${prediction.status === 201 ? 'Successful Received Prediction' : 'Failed to Received Prediction'}`
);
const { predictions } = prediction.data;
const stringifiedPrediction = JSON.stringify(predictions);
const originalFileName = `prediction-result.json`;
const fileUrl = await saveToTmp(Buffer.from(stringifiedPrediction), originalFileName);

await uploadToMinio(
fileUrl,
originalFileName,
bucket,
'application/json'
);
await uploadToMinio(fileUrl, originalFileName, bucket, 'application/json');
await fs.unlink(fileUrl);

} catch (error) {
logger.error(`Failed to receive prediction: ${error}`);
}
Expand All @@ -319,8 +325,8 @@ async function getPrediction(trainingFileFormData: FormData, historicFutureFormD
routes.post('/predict', upload.single('file'), async (req, res) => {
try {
const file = req.file;
const region = process.env.MINIO_BUCKET_REGION
const chapUrl = process.env.CHAP_URL
const region = process.env.MINIO_BUCKET_REGION;
const chapUrl = process.env.CHAP_URL;

if (!chapUrl) {
logger.error('Chap URL not set');
Expand All @@ -332,32 +338,40 @@ routes.post('/predict', upload.single('file'), async (req, res) => {
return res.status(400).json(createErrorResponse('FILE_MISSING', 'No file uploaded'));
}

try {
const historicData = extractHistoricData(file.buffer.toString());
await createHistoricalDiseaseTable();
await insertHistoricDiseaseData(historicData);
} catch (error) {
logger.error('There was an issue inserting the Historic Data: ' + JSON.stringify(error));
}

const modelPrediction = new ModelPredictionUsingChap(chapUrl, logger);

// start the Chap prediction job
const predictResponse = await modelPrediction.predict({ data: file.buffer.toString() });

if (predictResponse?.status === 'success') {
// wait for the prediction job to finish
const predictionResults = await new Promise((resolve, reject) => {
const predictionResults = (await new Promise((resolve, reject) => {
const interval = setInterval(async () => {
const statusResponse = await modelPrediction.getStatus();
if (statusResponse?.status === 'idle' && statusResponse?.ready) {
clearInterval(interval);
resolve((await modelPrediction.getResult()).data);
}
}, 250);
}) as any;
})) as any;

const bucketName = sanitizeBucketName(
`${file.originalname.split('.')[0]}-${Math.round(new Date().getTime() / 1000)}`
)
);

const predictionResultsForMinio = predictionResults?.dataValues?.map((d: any) => {
return {
...d,
diseaseId: predictionResults.diseaseId as string,
}
};
});

await ensureBucketExists(bucketName, true);
Expand All @@ -367,7 +381,9 @@ routes.post('/predict', upload.single('file'), async (req, res) => {
return res.status(200).json(predictionResultsForMinio);
}

return res.status(500).json({ error: 'Error predicting model. Error response from Chap API' });
return res
.status(500)
.json({ error: 'Error predicting model. Error response from Chap API' });
} catch (err) {
logger.error('Error predicting model:');
logger.error(err);
Expand Down
60 changes: 55 additions & 5 deletions src/utils/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ export async function createTable(fields: string[], tableName: string) {
/**
* Create a table within clickhouse from the inferred schema from the json file
* if table already exists within the clickhouse function will return false
*
*
* @param s3Path URL location of the json within Minio
* @param s3Config Access key and Secrete key credentials to access Minio
* @param tableName The name of the table to be created within Minio
* @param groupByColumnName The column the created table will be ORDERED By within clickhouse
* @returns
* @returns
*/

export async function createTableFromJson(
Expand Down Expand Up @@ -81,10 +81,14 @@ export async function createTableFromJson(
}

try {

logger.info(`Creating table from JSON ${normalizedTableName}`);

const query = generateDDLFromJson(s3Path, s3Config, normalizedTableName, groupByColumnName);
const query = generateDDLFromJson(
s3Path,
s3Config,
normalizedTableName,
groupByColumnName
);
const res = await client.query({ query });

logger.info(`Successfully created table from JSON ${normalizedTableName}`);
Expand All @@ -98,7 +102,6 @@ export async function createTableFromJson(
logger.error(err);
return false;
}

}

export function generateDDL(fields: string[], tableName: string) {
Expand Down Expand Up @@ -202,3 +205,50 @@ export async function insertFromS3Json(
await client.close();
}
}

export async function createHistoricalDiseaseTable() {
const client = createClient({
url: 'http://localhost:8123',
password: 'dev_password_only',
});

try {
logger.debug('Now creating table');
await client.query({
query: `
CREATE TABLE IF NOT EXISTS historical_disease (
ou String,
pe String,
value Int64
) ENGINE = MergeTree()
ORDER BY (ou)
`,
});
logger.debug('Table created successfully');
} catch (error) {
logger.error("There was an issue creating the table in clickhouse: " + JSON.stringify(error));
}
return client.close();
}

export async function insertHistoricDiseaseData(
diseaseData: { ou: string; pe: string; value: number }[]
) {
const client = createClient({
url: 'http://localhost:8123',
password: 'dev_password_only',
});

try {
logger.debug('Now inserting data');
await client.insert({
table: 'historical_disease',
values: diseaseData,
format: 'JSONEachRow',
});
logger.debug('Insertion successful');
} catch (error) {
logger.error('There was an issue inserting the data into clickhouse: ' + JSON.stringify(error));
}
return client.close();
}
13 changes: 13 additions & 0 deletions src/utils/file-validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,16 @@ export function validateBucketName(bucket: string): boolean {
const regex = new RegExp(/^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$/);
return regex.test(bucket);
}

export function extractHistoricData(jsonStringified: string): { ou: string; pe: string; value: number }[]{
const jsonPayload = JSON.parse(jsonStringified);
//@ts-ignore
const diseaseCases = jsonPayload.features.find(feature => feature['featureId'] === 'disease_cases')

if(diseaseCases === undefined){
throw new Error("Could not find historic disease data within payload");
}

const {data} = diseaseCases;
return data;
}

0 comments on commit 7fe00b1

Please sign in to comment.