diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4d642f402f..52b6479993 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -680,7 +680,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( } close(recordStream.Records) - avroSync := NewQRepAvroSyncMethod(c, req.StagingPath) + avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName) rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx) if err != nil { return nil, fmt.Errorf("failed to get metadata of destination table: %v", err) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index c92e17fa14..5f395f3585 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -18,14 +18,17 @@ import ( ) type QRepAvroSyncMethod struct { - connector *BigQueryConnector - gcsBucket string + connector *BigQueryConnector + gcsBucket string + flowJobName string } -func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string) *QRepAvroSyncMethod { +func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string, + flowJobName string) *QRepAvroSyncMethod { return &QRepAvroSyncMethod{ - connector: connector, - gcsBucket: gcsBucket, + connector: connector, + gcsBucket: gcsBucket, + flowJobName: flowJobName, } } @@ -325,58 +328,59 @@ func (s *QRepAvroSyncMethod) writeToStage( shutdown <- true }() - var avroFilePath string - numRecords, err := func() (int, error) { - ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, - avro.CompressNone, qvalue.QDWHTypeBigQuery) - if s.gcsBucket != "" { - bucket := s.connector.storageClient.Bucket(s.gcsBucket) - avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID) - obj := bucket.Object(avroFilePath) - w := obj.NewWriter(s.connector.ctx) - - numRecords, err := ocfWriter.WriteOCF(w) - if err != nil { - return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err) - } - return numRecords, err - } else { - tmpDir, err := os.MkdirTemp("", "peerdb-avro") - if err != nil { - return 0, fmt.Errorf("failed to create temp dir: %w", err) - } + var avroFile *avro.AvroFile + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, + avro.CompressNone, qvalue.QDWHTypeBigQuery) + if s.gcsBucket != "" { + bucket := s.connector.storageClient.Bucket(s.gcsBucket) + avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID) + obj := bucket.Object(avroFilePath) + w := obj.NewWriter(s.connector.ctx) - avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID) - log.WithFields(log.Fields{ - "batchOrPartitionID": syncID, - }).Infof("writing records to local file %s", avroFilePath) - numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath) - if err != nil { - return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) - } - return numRecords, err + numRecords, err := ocfWriter.WriteOCF(w) + if err != nil { + return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err) + } + avroFile = &avro.AvroFile{ + NumRecords: numRecords, + StorageLocation: avro.AvroGCSStorage, + FilePath: avroFilePath, + } + } else { + tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), s.flowJobName) + err := os.MkdirAll(tmpDir, os.ModePerm) + if err != nil { + return 0, fmt.Errorf("failed to create temp dir: %w", err) + } + + avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID) + log.WithFields(log.Fields{ + "batchOrPartitionID": syncID, + }).Infof("writing records to local file %s", avroFilePath) + avroFile, err = ocfWriter.WriteRecordsToAvroFile(avroFilePath) + if err != nil { + return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) } - }() - if err != nil { - return 0, err } - if numRecords == 0 { + defer avroFile.Cleanup() + + if avroFile.NumRecords == 0 { return 0, nil } log.WithFields(log.Fields{ "batchOrPartitionID": syncID, - }).Infof("wrote %d records to file %s", numRecords, avroFilePath) + }).Infof("wrote %d records to file %s", avroFile.NumRecords, avroFile.FilePath) bqClient := s.connector.client datasetID := s.connector.datasetID var avroRef bigquery.LoadSource if s.gcsBucket != "" { - gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath)) + gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFile.FilePath)) gcsRef.SourceFormat = bigquery.Avro gcsRef.Compression = bigquery.Deflate avroRef = gcsRef } else { - fh, err := os.Open(avroFilePath) + fh, err := os.Open(avroFile.FilePath) if err != nil { return 0, fmt.Errorf("failed to read local Avro file: %w", err) } @@ -401,6 +405,6 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } log.Printf("Pushed into %s/%s", - avroFilePath, syncID) - return numRecords, nil + avroFile.FilePath, syncID) + return avroFile.NumRecords, nil } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 1f1cf881da..14d9e6501d 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -64,12 +64,13 @@ func (c *S3Connector) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID) writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) - numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds) + avroFile, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds) if err != nil { return 0, fmt.Errorf("failed to write records to S3: %w", err) } + defer avroFile.Cleanup() - return numRecords, nil + return avroFile.NumRecords, nil } // S3 just sets up destination, not metadata tables diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 7d540c9f2f..920e01d1ca 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -60,14 +60,15 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( } partitionID := util.RandomString(16) - numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) + avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) if err != nil { return 0, err } + defer avroFile.Cleanup() log.WithFields(log.Fields{ "destinationTable": dstTableName, "flowName": flowJobName, - }).Infof("written %d records to Avro file", numRecords) + }).Infof("written %d records to Avro file", avroFile.NumRecords) stage := s.connector.getStageNameForJob(s.config.FlowJobName) err = s.connector.createStage(stage, s.config) @@ -85,7 +86,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( } allCols := colInfo.Columns - err = s.putFileToStage(localFilePath, stage) + err = s.putFileToStage(avroFile, stage) if err != nil { return 0, err } @@ -101,7 +102,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( "destinationTable": dstTableName, }).Infof("copying records into %s from stage %s", s.config.DestinationTableIdentifier, stage) - return numRecords, nil + return avroFile.NumRecords, nil } func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( @@ -138,28 +139,15 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( return 0, err } - numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName) + avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err } - - if localFilePath != "" { - defer func() { - log.Infof("removing temp file %s", localFilePath) - err := os.Remove(localFilePath) - if err != nil { - log.WithFields(log.Fields{ - "flowName": config.FlowJobName, - "partitionID": partition.PartitionId, - "destinationTable": dstTableName, - }).Errorf("failed to remove temp file %s: %v", localFilePath, err) - } - }() - } + defer avroFile.Cleanup() stage := s.connector.getStageNameForJob(config.FlowJobName) - err = s.putFileToStage(localFilePath, stage) + err = s.putFileToStage(avroFile, stage) if err != nil { return 0, err } @@ -175,7 +163,7 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( activity.RecordHeartbeat(s.connector.ctx, "finished syncing records") - return numRecords, nil + return avroFile.NumRecords, nil } func (s *SnowflakeAvroSyncMethod) addMissingColumns( @@ -271,14 +259,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( avroSchema *model.QRecordAvroSchemaDefinition, partitionID string, flowJobName string, -) (int, string, error) { - var numRecords int +) (*avro.AvroFile, error) { if s.config.StagingPath == "" { ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake) - tmpDir, err := os.MkdirTemp("", "peerdb-avro") + tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), flowJobName) + err := os.MkdirAll(tmpDir, os.ModePerm) if err != nil { - return 0, "", fmt.Errorf("failed to create temp dir: %w", err) + return nil, fmt.Errorf("failed to create temp dir: %w", err) } localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID) @@ -286,18 +274,18 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( "flowName": flowJobName, "partitionID": partitionID, }).Infof("writing records to local file %s", localFilePath) - numRecords, err = ocfWriter.WriteRecordsToAvroFile(localFilePath) + avroFile, err := ocfWriter.WriteRecordsToAvroFile(localFilePath) if err != nil { - return 0, "", fmt.Errorf("failed to write records to Avro file: %w", err) + return nil, fmt.Errorf("failed to write records to Avro file: %w", err) } - return numRecords, localFilePath, nil + return avroFile, nil } else if strings.HasPrefix(s.config.StagingPath, "s3://") { ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) if err != nil { - return 0, "", fmt.Errorf("failed to parse staging path: %w", err) + return nil, fmt.Errorf("failed to parse staging path: %w", err) } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID) @@ -305,39 +293,39 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( "flowName": flowJobName, "partitionID": partitionID, }).Infof("OCF: Writing records to S3") - numRecords, err = ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) + avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) if err != nil { - return 0, "", fmt.Errorf("failed to write records to S3: %w", err) + return nil, fmt.Errorf("failed to write records to S3: %w", err) } - return numRecords, "", nil + return avroFile, nil } - return 0, "", fmt.Errorf("unsupported staging path: %s", s.config.StagingPath) + return nil, fmt.Errorf("unsupported staging path: %s", s.config.StagingPath) } -func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage string) error { - if localFilePath == "" { +func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error { + if avroFile.StorageLocation != avro.AvroLocalStorage { log.Infof("no file to put to stage") return nil } activity.RecordHeartbeat(s.connector.ctx, "putting file to stage") - putCmd := fmt.Sprintf("PUT file://%s @%s", localFilePath, stage) + putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) - sutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { return fmt.Sprintf("putting file to stage %s", stage) }) defer func() { - sutdown <- true + shutdown <- true }() if _, err := s.connector.database.Exec(putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) } - log.Infof("put file %s to stage %s", localFilePath, stage) + log.Infof("put file %s to stage %s", avroFile.FilePath, stage) return nil } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index b0c5576259..3c7fa5830c 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -22,6 +22,7 @@ import ( ) type AvroCompressionCodec int64 +type AvroStorageLocation int64 const ( CompressNone AvroCompressionCodec = iota @@ -30,6 +31,12 @@ const ( CompressSnappy ) +const ( + AvroLocalStorage = iota + AvroS3Storage + AvroGCSStorage +) + type peerDBOCFWriter struct { ctx context.Context stream *model.QRecordStream @@ -39,6 +46,21 @@ type peerDBOCFWriter struct { targetDWH qvalue.QDWHType } +type AvroFile struct { + NumRecords int + StorageLocation AvroStorageLocation + FilePath string +} + +func (l *AvroFile) Cleanup() { + if l.StorageLocation == AvroLocalStorage { + err := os.Remove(l.FilePath) + if err != nil && !os.IsNotExist(err) { + log.Warnf("unable to delete temporary Avro file: %v", err) + } + } +} + func NewPeerDBOCFWriter( ctx context.Context, stream *model.QRecordStream, @@ -164,7 +186,7 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { return numRows, nil } -func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (int, error) { +func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { r, w := io.Pipe() numRowsWritten := make(chan int, 1) go func() { @@ -179,7 +201,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils s3svc, err := utils.CreateS3Client(s3Creds) if err != nil { log.Errorf("failed to create S3 client: %v", err) - return 0, fmt.Errorf("failed to create S3 client: %w", err) + return nil, fmt.Errorf("failed to create S3 client: %w", err) } // Create an uploader with the session and default options @@ -194,21 +216,34 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils if err != nil { log.Errorf("failed to upload file: %v", err) - return 0, fmt.Errorf("failed to upload file: %w", err) + return nil, fmt.Errorf("failed to upload file: %w", err) } log.Infof("file uploaded to, %s", result.Location) - return <-numRowsWritten, nil + return &AvroFile{ + NumRecords: <-numRowsWritten, + StorageLocation: AvroS3Storage, + FilePath: key, + }, nil } -func (p *peerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) { +func (p *peerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (*AvroFile, error) { file, err := os.Create(filePath) if err != nil { - return 0, fmt.Errorf("failed to create file: %w", err) + return nil, fmt.Errorf("failed to create temporary Avro file: %w", err) + } + + numRecords, err := p.WriteOCF(file) + if err != nil { + return nil, fmt.Errorf("failed to write records to temporary Avro file: %w", err) } - defer file.Close() - return p.WriteOCF(file) + + return &AvroFile{ + NumRecords: numRecords, + StorageLocation: AvroLocalStorage, + FilePath: filePath, + }, nil } type nopWriteCloser struct {