diff --git a/src/routes/index.ts b/src/routes/index.ts index f282996..9e2fd24 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -1,7 +1,11 @@ import express from 'express'; import multer from 'multer'; import { getConfig } from '../config/config'; -import { getCsvHeaders, validateBucketName } from '../utils/file-validators'; +import { + extractHistoricData, + getCsvHeaders, + validateBucketName, +} from '../utils/file-validators'; import logger from '../logger'; import fs from 'fs/promises'; import path from 'path'; @@ -14,9 +18,10 @@ import { } from '../utils/minioClient'; import { registerBucket } from '../openhim/openhim'; import axios from 'axios'; -import FormData from 'form-data' +import FormData from 'form-data'; import { createClient } from '@clickhouse/client'; import { ModelPredictionUsingChap } from '../services/ModelPredictionUsingChap'; +import { createHistoricalDiseaseTable, insertHistoricDiseaseData } from '../utils/clickhouse'; // Constants const VALID_MIME_TYPES = ['text/csv', 'application/json'] as const; @@ -73,7 +78,7 @@ const validateJsonFile = (buffer: Buffer): boolean => { // File handlers const handleCsvFile = async ( files: Express.Multer.File[], - bucket: string, + bucket: string ): Promise => { try { for (const file of files) { @@ -100,7 +105,7 @@ const handleCsvFile = async ( const handleJsonFile = async ( file: Express.Multer.File, bucket: string, - region: string, + region: string ): Promise => { if (!validateJsonFile(file.buffer)) { return createErrorResponse('INVALID_JSON_FORMAT', 'Invalid JSON file format'); @@ -126,7 +131,11 @@ const handleJsonFile = async ( } }; -const handleJsonPayload = async (file: Express.Multer.File, json: Object, bucket: string): Promise => { +const handleJsonPayload = async ( + file: Express.Multer.File, + json: Object, + bucket: string +): Promise => { try { const uploadResult = await uploadFileBufferToMinio( Buffer.from(JSON.stringify(json)), @@ -134,7 +143,7 @@ const handleJsonPayload = async (file: Express.Multer.File, json: Object, bucket bucket, file.mimetype ); - + return uploadResult.success ? createSuccessResponse('UPLOAD_SUCCESS', uploadResult.message) : createErrorResponse('UPLOAD_FAILED', uploadResult.message); @@ -236,18 +245,13 @@ routes.post('/upload', async (req, res) => { await ensureBucketExists(bucket, createBucketIfNotExists); getPrediction(trainingFileFormData, historicFutureFormData, bucket); - - if (createBucketIfNotExists && getConfig().runningMode !== 'testing') { await registerBucket(bucket); } let response: UploadResponse; if (trainingFile.mimetype === 'text/csv') { - response = await handleCsvFile( - [trainingFile, historicFile, futureFile], - bucket - ); + response = await handleCsvFile([trainingFile, historicFile, futureFile], bucket); } else { response = createErrorResponse('INVALID_FILE_TYPE', 'Invalid file type'); } @@ -274,43 +278,45 @@ routes.post('/upload', async (req, res) => { }); }); -async function getPrediction(trainingFileFormData: FormData, historicFutureFormData: FormData, bucket: string) { +async function getPrediction( + trainingFileFormData: FormData, + historicFutureFormData: FormData, + bucket: string +) { try { const { chapApiUrl } = getConfig(); const { url, password } = getConfig().clickhouse; const client = createClient({ - url, - password, - }); + url, + password, + }); - const trainingResults = await axios.post(chapApiUrl + '/train', trainingFileFormData, { + const trainingResults = await axios.post(chapApiUrl + '/train', trainingFileFormData, { headers: { - ...trainingFileFormData.getHeaders() + ...trainingFileFormData.getHeaders(), }, - }) + }); - logger.debug(`CHAP Training Results: ${trainingResults.status === 201 ? 'Upload Successful':'Upload Failed'}`) + logger.debug( + `CHAP Training Results: ${trainingResults.status === 201 ? 'Upload Successful' : 'Upload Failed'}` + ); const prediction = await axios.post(chapApiUrl + '/predict', historicFutureFormData, { headers: { - ...historicFutureFormData.getHeaders() + ...historicFutureFormData.getHeaders(), }, - }) - - logger.debug(`CHAP Prediction Results: ${prediction.status === 201 ? 'Successful Received Prediction':'Failed to Received Prediction'}`); - const { predictions } = prediction.data + }); + + logger.debug( + `CHAP Prediction Results: ${prediction.status === 201 ? 'Successful Received Prediction' : 'Failed to Received Prediction'}` + ); + const { predictions } = prediction.data; const stringifiedPrediction = JSON.stringify(predictions); const originalFileName = `prediction-result.json`; const fileUrl = await saveToTmp(Buffer.from(stringifiedPrediction), originalFileName); - await uploadToMinio( - fileUrl, - originalFileName, - bucket, - 'application/json' - ); + await uploadToMinio(fileUrl, originalFileName, bucket, 'application/json'); await fs.unlink(fileUrl); - } catch (error) { logger.error(`Failed to receive prediction: ${error}`); } @@ -319,8 +325,8 @@ async function getPrediction(trainingFileFormData: FormData, historicFutureFormD routes.post('/predict', upload.single('file'), async (req, res) => { try { const file = req.file; - const region = process.env.MINIO_BUCKET_REGION - const chapUrl = process.env.CHAP_URL + const region = process.env.MINIO_BUCKET_REGION; + const chapUrl = process.env.CHAP_URL; if (!chapUrl) { logger.error('Chap URL not set'); @@ -332,6 +338,14 @@ routes.post('/predict', upload.single('file'), async (req, res) => { return res.status(400).json(createErrorResponse('FILE_MISSING', 'No file uploaded')); } + try { + const historicData = extractHistoricData(file.buffer.toString()); + await createHistoricalDiseaseTable(); + await insertHistoricDiseaseData(historicData); + } catch (error) { + logger.error('There was an issue inserting the Historic Data: ' + JSON.stringify(error)); + } + const modelPrediction = new ModelPredictionUsingChap(chapUrl, logger); // start the Chap prediction job @@ -339,7 +353,7 @@ routes.post('/predict', upload.single('file'), async (req, res) => { if (predictResponse?.status === 'success') { // wait for the prediction job to finish - const predictionResults = await new Promise((resolve, reject) => { + const predictionResults = (await new Promise((resolve, reject) => { const interval = setInterval(async () => { const statusResponse = await modelPrediction.getStatus(); if (statusResponse?.status === 'idle' && statusResponse?.ready) { @@ -347,17 +361,17 @@ routes.post('/predict', upload.single('file'), async (req, res) => { resolve((await modelPrediction.getResult()).data); } }, 250); - }) as any; + })) as any; const bucketName = sanitizeBucketName( `${file.originalname.split('.')[0]}-${Math.round(new Date().getTime() / 1000)}` - ) + ); const predictionResultsForMinio = predictionResults?.dataValues?.map((d: any) => { return { ...d, diseaseId: predictionResults.diseaseId as string, - } + }; }); await ensureBucketExists(bucketName, true); @@ -367,7 +381,9 @@ routes.post('/predict', upload.single('file'), async (req, res) => { return res.status(200).json(predictionResultsForMinio); } - return res.status(500).json({ error: 'Error predicting model. Error response from Chap API' }); + return res + .status(500) + .json({ error: 'Error predicting model. Error response from Chap API' }); } catch (err) { logger.error('Error predicting model:'); logger.error(err); diff --git a/src/utils/clickhouse.ts b/src/utils/clickhouse.ts index 2f7c44e..ea2697f 100644 --- a/src/utils/clickhouse.ts +++ b/src/utils/clickhouse.ts @@ -44,12 +44,12 @@ export async function createTable(fields: string[], tableName: string) { /** * Create a table within clickhouse from the inferred schema from the json file * if table already exists within the clickhouse function will return false - * + * * @param s3Path URL location of the json within Minio * @param s3Config Access key and Secrete key credentials to access Minio * @param tableName The name of the table to be created within Minio * @param groupByColumnName The column the created table will be ORDERED By within clickhouse - * @returns + * @returns */ export async function createTableFromJson( @@ -81,10 +81,14 @@ export async function createTableFromJson( } try { - logger.info(`Creating table from JSON ${normalizedTableName}`); - const query = generateDDLFromJson(s3Path, s3Config, normalizedTableName, groupByColumnName); + const query = generateDDLFromJson( + s3Path, + s3Config, + normalizedTableName, + groupByColumnName + ); const res = await client.query({ query }); logger.info(`Successfully created table from JSON ${normalizedTableName}`); @@ -98,7 +102,6 @@ export async function createTableFromJson( logger.error(err); return false; } - } export function generateDDL(fields: string[], tableName: string) { @@ -202,3 +205,50 @@ export async function insertFromS3Json( await client.close(); } } + +export async function createHistoricalDiseaseTable() { + const client = createClient({ + url: 'http://localhost:8123', + password: 'dev_password_only', + }); + + try { + logger.debug('Now creating table'); + await client.query({ + query: ` + CREATE TABLE IF NOT EXISTS historical_disease ( + ou String, + pe String, + value Int64 + ) ENGINE = MergeTree() + ORDER BY (ou) + `, + }); + logger.debug('Table created successfully'); + } catch (error) { + logger.error("There was an issue creating the table in clickhouse: " + JSON.stringify(error)); + } + return client.close(); +} + +export async function insertHistoricDiseaseData( + diseaseData: { ou: string; pe: string; value: number }[] +) { + const client = createClient({ + url: 'http://localhost:8123', + password: 'dev_password_only', + }); + + try { + logger.debug('Now inserting data'); + await client.insert({ + table: 'historical_disease', + values: diseaseData, + format: 'JSONEachRow', + }); + logger.debug('Insertion successful'); + } catch (error) { + logger.error('There was an issue inserting the data into clickhouse: ' + JSON.stringify(error)); + } + return client.close(); +} diff --git a/src/utils/file-validators.ts b/src/utils/file-validators.ts index 89d4945..abf7985 100644 --- a/src/utils/file-validators.ts +++ b/src/utils/file-validators.ts @@ -31,3 +31,16 @@ export function validateBucketName(bucket: string): boolean { const regex = new RegExp(/^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$/); return regex.test(bucket); } + +export function extractHistoricData(jsonStringified: string): { ou: string; pe: string; value: number }[]{ + const jsonPayload = JSON.parse(jsonStringified); + //@ts-ignore + const diseaseCases = jsonPayload.features.find(feature => feature['featureId'] === 'disease_cases') + + if(diseaseCases === undefined){ + throw new Error("Could not find historic disease data within payload"); + } + + const {data} = diseaseCases; + return data; +}