@@ -2,13 +2,111 @@ import * as Minio from 'minio';
2
2
import { getConfig } from '../config/config' ;
3
3
import logger from '../logger' ;
4
4
import { readFile , rm } from 'fs/promises' ;
5
- import { createTable } from './clickhouse' ;
5
+ import { createTable , insertFromS3 } from './clickhouse' ;
6
6
import { validateJsonFile , getCsvHeaders } from './file-validators' ;
7
7
8
- export async function createMinioBucketListeners ( ) {
9
- const { buckets, endPoint, port, useSSL, accessKey, secretKey, prefix, suffix } =
10
- getConfig ( ) . minio ;
8
+ const { endPoint, port, useSSL, bucketRegion, accessKey, secretKey, prefix, suffix, buckets } =
9
+ getConfig ( ) . minio ;
10
+
11
+ /**
12
+ * Uploads a file to Minio storage
13
+ * @param {string } sourceFile - Path to the file to upload
14
+ * @param {string } destinationObject - Name for the uploaded object
15
+ * @param {Object } [customMetadata={}] - Optional custom metadata
16
+ * @returns {Promise<void> }
17
+ */
18
+ export async function uploadToMinio (
19
+ sourceFile : string ,
20
+ destinationObject : string ,
21
+ bucket : string ,
22
+ fileType : string ,
23
+ customMetadata = { }
24
+ ) {
25
+ const minioClient = new Minio . Client ( {
26
+ endPoint,
27
+ port,
28
+ useSSL,
29
+ accessKey,
30
+ secretKey,
31
+ } ) ;
32
+ // Check if bucket exists, create if it doesn't
33
+ const exists = await minioClient . bucketExists ( bucket ) ;
34
+ if ( ! exists ) {
35
+ await minioClient . makeBucket ( bucket , bucketRegion ) ;
36
+ logger . info ( `Bucket ${ bucket } created in "${ bucketRegion } ".` ) ;
37
+ }
38
+
39
+ try {
40
+ const fileExists = await checkFileExists ( destinationObject , bucket , fileType ) ;
41
+ if ( fileExists ) {
42
+ return false ;
43
+ } else {
44
+ const metaData = {
45
+ 'Content-Type' : fileType ,
46
+ 'X-Upload-Id' : crypto . randomUUID ( ) ,
47
+ ...customMetadata ,
48
+ } ;
49
+
50
+ // Upload the file
51
+ await minioClient . fPutObject ( bucket , destinationObject , sourceFile , metaData ) ;
52
+ logger . info (
53
+ `File ${ sourceFile } uploaded as object ${ destinationObject } in bucket ${ bucket } `
54
+ ) ;
55
+ return true ;
56
+ }
57
+ } catch ( error ) {
58
+ console . error ( 'Error checking file:' , error ) ;
59
+ }
60
+ }
61
+
62
+ /**
63
+ * Checks if a CSV file exists in the specified Minio bucket
64
+ * @param {string } fileName - Name of the CSV file to check
65
+ * @param {string } bucket - Bucket name
66
+ * @returns {Promise<boolean> } - Returns true if file exists, false otherwise
67
+ */
68
+ export async function checkFileExists (
69
+ fileName : string ,
70
+ bucket : string ,
71
+ fileType : string
72
+ ) : Promise < boolean > {
73
+ const minioClient = new Minio . Client ( {
74
+ endPoint,
75
+ port,
76
+ useSSL,
77
+ accessKey,
78
+ secretKey,
79
+ } ) ;
80
+
81
+ try {
82
+ // Check if bucket exists first
83
+ const bucketExists = await minioClient . bucketExists ( bucket ) ;
84
+ if ( ! bucketExists ) {
85
+ logger . info ( `Bucket ${ bucket } does not exist` ) ;
86
+ return false ;
87
+ }
11
88
89
+ // Get object stats to check if file exists
90
+ const stats = await minioClient . statObject ( bucket , fileName ) ; // Optionally verify it's a CSV file by checking Content-Type
91
+ if ( stats . metaData && stats . metaData [ 'content-type' ] === fileType ) {
92
+ logger . info ( `File ${ fileName } exists in bucket ${ bucket } ` ) ;
93
+ return true ;
94
+ } else {
95
+ logger . info ( `File ${ fileName } does not exist in bucket ${ bucket } ` ) ;
96
+ return false ;
97
+ }
98
+ } catch ( err : any ) {
99
+ if ( err . code === 'NotFound' ) {
100
+ logger . debug ( `File ${ fileName } not found in bucket ${ bucket } ` ) ;
101
+ return false ;
102
+ }
103
+ // For any other error, log it and rethrow
104
+ logger . error ( `Error checking file existence: ${ err . message } ` ) ;
105
+ throw err ;
106
+ }
107
+ }
108
+
109
+ export async function createMinioBucketListeners ( ) {
12
110
const minioClient = new Minio . Client ( {
13
111
endPoint,
14
112
port,
@@ -36,12 +134,15 @@ export async function createMinioBucketListeners() {
36
134
logger . debug ( `Listening for notifications on bucket ${ bucket } ` ) ;
37
135
38
136
listener . on ( 'notification' , async ( notification ) => {
137
+
39
138
//@ts -ignore
40
139
const file = notification . s3 . object . key ;
41
-
140
+
42
141
//@ts -ignore
43
142
const tableName = notification . s3 . bucket . name ;
44
143
144
+ logger . info ( `File received: ${ file } from bucket ${ tableName } ` ) ;
145
+
45
146
//@ts -ignore
46
147
minioClient . fGetObject ( bucket , file , `tmp/${ file } ` , async ( err ) => {
47
148
if ( err ) {
@@ -63,10 +164,22 @@ export async function createMinioBucketListeners() {
63
164
const fields = ( await readFile ( `tmp/${ file } ` , 'utf8' ) ) . split ( '\n' ) [ 0 ] . split ( ',' ) ;
64
165
65
166
await createTable ( fields , tableName ) ;
167
+
168
+ // If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
169
+ const host = getConfig ( ) . runningMode === 'testing' ? 'minio' : endPoint ;
170
+ // Construct the S3-style URL for the file
171
+ const minioUrl = `http://${ host } :${ port } /${ bucket } /${ file } ` ;
172
+
173
+ // Insert data into clickhouse
174
+ await insertFromS3 ( tableName , minioUrl , {
175
+ accessKey,
176
+ secretKey,
177
+ } ) ;
66
178
} else {
67
179
logger . warn ( `Unknown file type - ${ extension } ` ) ;
68
180
}
69
181
await rm ( `tmp/${ file } ` ) ;
182
+ logger . debug ( `File ${ file } deleted from tmp directory` ) ;
70
183
}
71
184
} ) ;
72
185
} ) ;
0 commit comments