Skip to content

Commit 67d993a

Browse files
committed
Merge branch 'main' of github.com:jembi/climate-mediator into add-endpoint-for-download
2 parents 9018dc4 + 715e8fc commit 67d993a

File tree

4 files changed

+164
-56
lines changed

4 files changed

+164
-56
lines changed

src/routes/index.ts

+27-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const createSuccessResponse = (code: string, message: string): UploadResponse =>
4646
message,
4747
});
4848

49-
const saveCsvToTmp = async (fileBuffer: Buffer, fileName: string): Promise<string> => {
49+
const saveToTmp = async (fileBuffer: Buffer, fileName: string): Promise<string> => {
5050
const tmpDir = path.join(process.cwd(), 'tmp');
5151
await fs.mkdir(tmpDir, { recursive: true });
5252

@@ -77,7 +77,7 @@ const handleCsvFile = async (
7777
return createErrorResponse('INVALID_CSV_FORMAT', 'Invalid CSV file format');
7878
}
7979

80-
const fileUrl = await saveCsvToTmp(file.buffer, file.originalname);
80+
const fileUrl = await saveToTmp(file.buffer, file.originalname);
8181
try {
8282
const uploadResult = await uploadToMinio(
8383
fileUrl,
@@ -96,11 +96,33 @@ const handleCsvFile = async (
9696
}
9797
};
9898

99-
const handleJsonFile = (file: Express.Multer.File): UploadResponse => {
99+
const handleJsonFile = async (
100+
file: Express.Multer.File,
101+
bucket: string,
102+
region: string
103+
): Promise<UploadResponse> => {
100104
if (!validateJsonFile(file.buffer)) {
101105
return createErrorResponse('INVALID_JSON_FORMAT', 'Invalid JSON file format');
102106
}
103-
return createSuccessResponse('JSON_VALID', 'JSON file is valid - Future implementation');
107+
108+
const jsonString = file.buffer.toString().replace(/\n/g, '').replace(/\r/g, '');
109+
const fileUrl = await saveToTmp(Buffer.from(jsonString), file.originalname);
110+
try {
111+
const uploadResult = await uploadToMinio(
112+
fileUrl,
113+
file.originalname,
114+
bucket,
115+
file.mimetype
116+
);
117+
await fs.unlink(fileUrl);
118+
119+
return uploadResult.success
120+
? createSuccessResponse('UPLOAD_SUCCESS', uploadResult.message)
121+
: createErrorResponse('UPLOAD_FAILED', uploadResult.message);
122+
} catch (error) {
123+
logger.error('Error uploading file to Minio:', error);
124+
throw error;
125+
}
104126
};
105127

106128
// Main route handler
@@ -138,7 +160,7 @@ routes.post('/upload', upload.single('file'), async (req, res) => {
138160
const response =
139161
file.mimetype === 'text/csv'
140162
? await handleCsvFile(file, bucket, region)
141-
: handleJsonFile(file);
163+
: await handleJsonFile(file, bucket, region);
142164

143165
if (createBucketIfNotExists && getConfig().runningMode !== 'testing') {
144166
await registerBucket(bucket, region);

src/utils/clickhouse.ts

+83-21
Original file line numberDiff line numberDiff line change
@@ -41,31 +41,55 @@ export async function createTable(fields: string[], tableName: string) {
4141
return true;
4242
}
4343

44+
export async function createTableFromJson(
45+
s3Path: string,
46+
s3Config: { accessKey: string; secretKey: string },
47+
tableName: string,
48+
key: string
49+
) {
50+
const client = createClient({
51+
url,
52+
password,
53+
});
54+
55+
const normalizedTableName = tableName.replace(/-/g, '_');
56+
57+
//check if the table exists
58+
try {
59+
const existsResult = await client.query({
60+
query: `desc ${normalizedTableName}`,
61+
});
62+
logger.info(`Table ${normalizedTableName} already exists`);
63+
await client.close();
64+
return false;
65+
} catch (error) {
66+
logger.error(`Table ${normalizedTableName} does not exist`);
67+
}
68+
69+
const query = generateDDLFromJson(s3Path, s3Config, normalizedTableName, key);
70+
await client.query({ query });
71+
await client.close();
72+
}
73+
4474
export function generateDDL(fields: string[], tableName: string) {
4575
return `CREATE TABLE ${tableName} (table_id UUID DEFAULT generateUUIDv4(),${fields.map((field) => `${field} VARCHAR`).join(', ')}) ENGINE = MergeTree ORDER BY (table_id)`;
4676
}
4777

48-
export function flattenJson(json: any, prefix = ''): string[] {
49-
const fields: string[] = [];
50-
Object.keys(json).forEach((key) => {
51-
const value = json[key];
52-
if (typeof value === 'object') {
53-
if (key === 'main') {
54-
fields.push(...flattenJson(value));
55-
} else {
56-
// This is to avoid having a prefix starting with numbers
57-
if (Array.isArray(json)) {
58-
fields.push(...flattenJson(value, prefix));
59-
} else {
60-
fields.push(...flattenJson(value, `${key}_`));
61-
}
62-
}
63-
} else {
64-
fields.push(`${prefix}${key}`);
65-
}
66-
});
67-
const fieldsSet = new Set(fields);
68-
return Array.from(fieldsSet);
78+
export function generateDDLFromJson(
79+
s3Path: string,
80+
s3Config: { accessKey: string; secretKey: string },
81+
tableName: string,
82+
key: string
83+
) {
84+
const query = `
85+
CREATE TABLE IF NOT EXISTS \`default\`.${tableName}
86+
ENGINE = MergeTree
87+
ORDER BY ${key} EMPTY
88+
AS SELECT *
89+
FROM s3('${s3Path}', '${s3Config.accessKey}', '${s3Config.secretKey}', JSONEachRow)
90+
SETTINGS schema_inference_make_columns_nullable = 0
91+
`;
92+
return query;
6993
}
7094

7195
export async function insertFromS3(
@@ -107,3 +131,41 @@ export async function insertFromS3(
107131
await client.close();
108132
}
109133
}
134+
135+
export async function insertFromS3Json(
136+
tableName: string,
137+
s3Path: string,
138+
s3Config: {
139+
accessKey: string;
140+
secretKey: string;
141+
}
142+
) {
143+
const client = createClient({
144+
url,
145+
password,
146+
});
147+
148+
const normalizedTableName = tableName.replace(/-/g, '_');
149+
150+
try {
151+
logger.debug(`Inserting data into ${normalizedTableName}`);
152+
const query = `
153+
INSERT INTO \`default\`.${normalizedTableName}
154+
SELECT * FROM s3(
155+
'${s3Path}',
156+
'${s3Config.accessKey}',
157+
'${s3Config.secretKey}',
158+
'JSONEachRow'
159+
)
160+
`;
161+
await client.query({ query });
162+
logger.info(`Successfully inserted data into ${normalizedTableName}`);
163+
return true;
164+
} catch (error) {
165+
logger.error('Error inserting data from JSON');
166+
logger.error(error);
167+
return false;
168+
} finally {
169+
await client.close();
170+
}
171+
}

src/utils/minioClient.ts

+53-22
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
import * as Minio from 'minio';
2-
import axios, { AxiosError } from 'axios';
2+
import axios from 'axios';
33

44
import { getConfig } from '../config/config';
55
import logger from '../logger';
66
import crypto from 'crypto';
77
import { readFile, rm } from 'fs/promises';
8-
import { createTable, flattenJson, insertFromS3 } from './clickhouse';
8+
import {
9+
createTable,
10+
createTableFromJson,
11+
insertFromS3,
12+
insertFromS3Json,
13+
} from './clickhouse';
914
import { validateJsonFile, getCsvHeaders } from './file-validators';
1015
import { getOpenhimConfig, triggerProcessing } from '../openhim/openhim';
1116
import fs from 'fs/promises';
1217
import { createWriteStream } from 'fs';
1318
import path from 'path';
14-
import { timeStamp } from 'console';
19+
1520
export interface Bucket {
1621
bucket: string;
1722
region?: string;
@@ -46,6 +51,23 @@ interface FileExistsResponse extends MinioResponse {
4651
exists: boolean;
4752
}
4853

54+
/**
55+
* Get the first field of a json object
56+
* @param {any} json - The json object
57+
* @returns {string} - The first field
58+
*/
59+
function getFirstField(json: any) {
60+
let obj: any;
61+
if (Array.isArray(json) && json.length > 0) {
62+
obj = json[0];
63+
} else {
64+
obj = json;
65+
}
66+
67+
const fields = Object.keys(obj);
68+
return fields[0];
69+
}
70+
4971
/**
5072
* Ensures a bucket exists, creates it if it doesn't
5173
* @param {string} bucket - Bucket name
@@ -217,32 +239,41 @@ export async function minioListenerHandler (bucket: string, file: string, tableN
217239
const extension = file.split('.').pop();
218240
logger.info(`File Downloaded - Type: ${extension}`);
219241

220-
if (['json', 'csv'].includes(extension as string)) {
221-
let fields: string[] = [];
242+
if (extension === 'json' && validateJsonFile(fileBuffer)) {
243+
logger.info('File is a valid json file');
222244

223-
if (extension === 'json' && validateJsonFile(fileBuffer)) {
224-
// flatten the json and pass it to clickhouse
225-
fields = flattenJson(JSON.parse(fileBuffer.toString()));
226-
} else if (getCsvHeaders(fileBuffer)) {
227-
fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');
228-
}
245+
// Construct the S3-style URL for the file
246+
const minioUrl = `http://${endPoint}:${port}/${bucket}/${file}`;
229247

230-
if (fields.length) {
231-
await createTable(fields, tableName);
248+
const key = getFirstField(JSON.parse(fileBuffer.toString()));
232249

233-
// Construct the S3-style URL for the file
234-
const minioUrl = `http://${endPoint}:${port}/${bucket}/${file}`;
250+
// Create table from json
251+
await createTableFromJson(minioUrl, { accessKey, secretKey }, tableName, key);
235252

236-
// Insert data into clickhouse
237-
await insertFromS3(tableName, minioUrl, {
238-
accessKey,
239-
secretKey,
240-
});
241-
}
253+
// Insert data into clickhouse
254+
await insertFromS3Json(tableName, minioUrl, {
255+
accessKey,
256+
secretKey,
257+
});
258+
} else if (extension === 'csv' && getCsvHeaders(fileBuffer)) {
259+
//get the first line of the csv file
260+
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');
261+
262+
await createTable(fields, tableName);
263+
264+
// If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
265+
266+
// Construct the S3-style URL for the file
267+
const minioUrl = `http://${endPoint}:${port}/${bucket}/${file}`;
268+
269+
// Insert data into clickhouse
270+
await insertFromS3(tableName, minioUrl, {
271+
accessKey,
272+
secretKey,
273+
});
242274
} else {
243275
logger.warn(`Unknown file type - ${extension}`);
244276
}
245-
246277
await rm(`tmp/${file}`);
247278
logger.debug(`File ${file} deleted from tmp directory`);
248279
}

tests/services/table-creation.test.ts

+1-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from 'chai';
22
import { getCsvHeaders } from '../../src/utils/file-validators';
3-
import { createTable, flattenJson, generateDDL } from '../../src/utils/clickhouse';
3+
import { createTable, generateDDL } from '../../src/utils/clickhouse';
44

55
describe('Create Tables based on files', function () {
66
this.timeout(60_000);
@@ -20,13 +20,6 @@ describe('Create Tables based on files', function () {
2020
expect(fields).to.deep.equal(['id', 'name', 'age']);
2121
});
2222

23-
it('should extract columns based on a json file', async () => {
24-
const jsonFile = Buffer.from('[{"id":1,"name":"John","age":20}]');
25-
const json = JSON.parse(jsonFile.toString());
26-
const fields = flattenJson(json[0]);
27-
expect(fields).to.deep.equal(['id', 'name', 'age']);
28-
});
29-
3023
it('should generate a table create ddl based on a csv file', async () => {
3124
const csvFile = Buffer.from('id,name,age\n1,John,20\n2,Jane,21');
3225
const fields = getCsvHeaders(csvFile);

0 commit comments

Comments
 (0)