Skip to content

Commit 4fd9859

Browse files
committed
storing predictions within clickhouse
1 parent 5805ddd commit 4fd9859

File tree

3 files changed

+23
-24
lines changed

3 files changed

+23
-24
lines changed

src/routes/index.ts

+3-14
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,9 @@ async function getPrediction(trainingFileFormData: FormData, historicFutureFormD
277277
})
278278

279279
logger.debug(`CHAP Prediction Results: ${prediction.status === 201 ? 'Successful Received Prediction':'Failed to Received Prediction'}`);
280-
281-
const stringifiedPrediction = JSON.stringify(prediction.data);
282-
const originalFileName = 'prediction-result.json';
280+
const { predictions } = prediction.data
281+
const stringifiedPrediction = JSON.stringify(predictions);
282+
const originalFileName = `prediction-result.json`;
283283
const fileUrl = await saveToTmp(Buffer.from(stringifiedPrediction), originalFileName);
284284

285285
await uploadToMinio(
@@ -290,17 +290,6 @@ async function getPrediction(trainingFileFormData: FormData, historicFutureFormD
290290
);
291291
await fs.unlink(fileUrl);
292292

293-
//TODO: check if table does not exist
294-
await client.exec({query: `CREATE TABLE ${bucket}_prediction (time_period String, predicted_value Float64) ENGINE = MergeTree ORDER BY (time_period);`});
295-
296-
await client.insert({
297-
table: `${bucket}_prediction`,
298-
values: prediction.data.predictions,
299-
format: 'JSONEachRow'
300-
});
301-
302-
logger.debug('Clickhouse Data Insertion Completed')
303-
304293
} catch (error) {
305294
logger.error(`Failed to receive prediction: ${error}`);
306295
}

src/utils/clickhouse.ts

+17-7
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,22 @@ export async function createTable(fields: string[], tableName: string) {
4141
return true;
4242
}
4343

44+
/**
45+
* Create a table within clickhouse from the inferred schema from the json file
46+
* if table already exists within the clickhouse function will return false
47+
*
48+
* @param s3Path URL location of the json within Minio
49+
* @param s3Config Access key and Secrete key credentials to access Minio
50+
* @param tableName The name of the table to be created within Minio
51+
* @param groupByColumnName The column the created table will be ORDERED By within clickhouse
52+
* @returns
53+
*/
54+
4455
export async function createTableFromJson(
4556
s3Path: string,
4657
s3Config: { accessKey: string; secretKey: string },
4758
tableName: string,
48-
key: string
59+
groupByColumnName: string
4960
) {
5061
const client = createClient({
5162
url,
@@ -54,19 +65,18 @@ export async function createTableFromJson(
5465

5566
const normalizedTableName = tableName.replace(/-/g, '_');
5667

57-
//check if the table exists
5868
try {
5969
const existsResult = await client.query({
6070
query: `desc ${normalizedTableName}`,
6171
});
62-
logger.info(`Table ${normalizedTableName} already exists`);
72+
logger.debug(`Table ${normalizedTableName} already exists`);
6373
await client.close();
6474
return false;
6575
} catch (error) {
66-
logger.error(`Table ${normalizedTableName} does not exist`);
76+
logger.debug(`Table ${normalizedTableName} does not exist`);
6777
}
6878

69-
const query = generateDDLFromJson(s3Path, s3Config, normalizedTableName, key);
79+
const query = generateDDLFromJson(s3Path, s3Config, normalizedTableName, groupByColumnName);
7080
await client.query({ query });
7181
await client.close();
7282
}
@@ -79,12 +89,12 @@ export function generateDDLFromJson(
7989
s3Path: string,
8090
s3Config: { accessKey: string; secretKey: string },
8191
tableName: string,
82-
key: string
92+
groupByColumnName: string
8393
) {
8494
const query = `
8595
CREATE TABLE IF NOT EXISTS \`default\`.${tableName}
8696
ENGINE = MergeTree
87-
ORDER BY ${key} EMPTY
97+
ORDER BY ${groupByColumnName} EMPTY
8898
AS SELECT *
8999
FROM s3('${s3Path}', '${s3Config.accessKey}', '${s3Config.secretKey}', JSONEachRow)
90100
SETTINGS schema_inference_make_columns_nullable = 0

src/utils/minioClient.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ export async function createMinioBucketListeners(listOfBuckets: string[]) {
198198
const file = notification.s3.object.key;
199199

200200
//@ts-ignore
201-
const tableName = notification.s3.bucket.name;
201+
const tableName = notification.s3.bucket.name + '_predictions';
202202

203203
logger.info(`File received: ${file} from bucket ${tableName}`);
204204

@@ -216,10 +216,10 @@ export async function createMinioBucketListeners(listOfBuckets: string[]) {
216216
if (extension === 'json' && validateJsonFile(fileBuffer)) {
217217
logger.debug('Now inserting ' + file + 'into clickhouse');
218218

219-
const key = getFirstField(JSON.parse(fileBuffer.toString()));
219+
const groupByColumnName = getFirstField(JSON.parse(fileBuffer.toString()));
220220

221221
// Create table from json
222-
await createTableFromJson(minioUrl, { accessKey, secretKey }, tableName, key);
222+
await createTableFromJson(minioUrl, { accessKey, secretKey }, tableName, groupByColumnName);
223223

224224
// Insert data into clickhouse
225225
await insertFromS3Json(tableName, minioUrl, {

0 commit comments

Comments
 (0)