From af4baaaa6a8a4e5c56d5effbc3ae3f27ffc78970 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 8 Dec 2023 21:07:55 +0530 Subject: [PATCH] cleaning up Avro files for Snowflake CDC --- flow/connectors/snowflake/qrep_avro_sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index e74d0fdb2f..0e07463807 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -101,7 +101,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( "destinationTable": dstTableName, }).Infof("copying records into %s from stage %s", s.config.DestinationTableIdentifier, stage) - return numRecords, nil + return numRecords, os.Remove(localFilePath) } func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( @@ -275,8 +275,9 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( var numRecords int if s.config.StagingPath == "" { ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema) - tmpDir, err := os.MkdirTemp("", "peerdb-avro") - if err != nil { + tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), flowJobName) + err := os.Mkdir(tmpDir, os.ModePerm) + if err != nil && !os.IsExist(err) { return 0, "", fmt.Errorf("failed to create temp dir: %w", err) }