@@ -182,45 +182,40 @@ export async function createMinioBucketListeners() {
182
182
logger . info ( `File received: ${ file } from bucket ${ tableName } ` ) ;
183
183
184
184
try {
185
- //@ts -ignore
186
- minioClient . fGetObject ( bucket , file , `tmp/${ file } ` , async ( err ) => {
187
- if ( err ) {
188
- logger . error ( err ) ;
189
- } else {
190
- const fileBuffer = await readFile ( `tmp/${ file } ` ) ;
191
-
192
- //get the file extension
193
- const extension = file . split ( '.' ) . pop ( ) ;
194
- logger . info ( `File Downloaded - Type: ${ extension } ` ) ;
195
-
196
- if ( extension === 'json' && validateJsonFile ( fileBuffer ) ) {
197
- // flatten the json and pass it to clickhouse
198
- //const fields = flattenJson(JSON.parse(fileBuffer.toString()));
199
- //await createTable(fields, tableName);
200
- logger . warn ( `File type not currently supported- ${ extension } ` ) ;
201
- } else if ( extension === 'csv' && getCsvHeaders ( fileBuffer ) ) {
202
- //get the first line of the csv file
203
- const fields = ( await readFile ( `tmp/${ file } ` , 'utf8' ) ) . split ( '\n' ) [ 0 ] . split ( ',' ) ;
204
-
205
- await createTable ( fields , tableName ) ;
206
-
207
- // If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
208
- const host = getConfig ( ) . runningMode === 'testing' ? 'minio' : endPoint ;
209
- // Construct the S3-style URL for the file
210
- const minioUrl = `http://${ host } :${ port } /${ bucket } /${ file } ` ;
211
-
212
- // Insert data into clickhouse
213
- await insertFromS3 ( tableName , minioUrl , {
214
- accessKey,
215
- secretKey,
216
- } ) ;
217
- } else {
218
- logger . warn ( `Unknown file type - ${ extension } ` ) ;
219
- }
220
- await rm ( `tmp/${ file } ` ) ;
221
- logger . debug ( `File ${ file } deleted from tmp directory` ) ;
222
- }
223
- } ) ;
185
+ await minioClient . fGetObject ( bucket , file , `tmp/${ file } ` ) ;
186
+
187
+ const fileBuffer = await readFile ( `tmp/${ file } ` ) ;
188
+
189
+ //get the file extension
190
+ const extension = file . split ( '.' ) . pop ( ) ;
191
+ logger . info ( `File Downloaded - Type: ${ extension } ` ) ;
192
+
193
+ if ( extension === 'json' && validateJsonFile ( fileBuffer ) ) {
194
+ // flatten the json and pass it to clickhouse
195
+ //const fields = flattenJson(JSON.parse(fileBuffer.toString()));
196
+ //await createTable(fields, tableName);
197
+ logger . warn ( `File type not currently supported- ${ extension } ` ) ;
198
+ } else if ( extension === 'csv' && getCsvHeaders ( fileBuffer ) ) {
199
+ //get the first line of the csv file
200
+ const fields = ( await readFile ( `tmp/${ file } ` , 'utf8' ) ) . split ( '\n' ) [ 0 ] . split ( ',' ) ;
201
+
202
+ await createTable ( fields , tableName ) ;
203
+
204
+ // If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
205
+ const host = getConfig ( ) . runningMode === 'testing' ? 'minio' : endPoint ;
206
+ // Construct the S3-style URL for the file
207
+ const minioUrl = `http://${ host } :${ port } /${ bucket } /${ file } ` ;
208
+
209
+ // Insert data into clickhouse
210
+ await insertFromS3 ( tableName , minioUrl , {
211
+ accessKey,
212
+ secretKey,
213
+ } ) ;
214
+ } else {
215
+ logger . warn ( `Unknown file type - ${ extension } ` ) ;
216
+ }
217
+ await rm ( `tmp/${ file } ` ) ;
218
+ logger . debug ( `File ${ file } deleted from tmp directory` ) ;
224
219
} catch ( error ) {
225
220
logger . error ( `Error processing file ${ file } : ${ error } ` ) ;
226
221
}
0 commit comments