Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup local Avro files, and fixed directory #767

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
}

close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
Expand Down
90 changes: 47 additions & 43 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
)

type QRepAvroSyncMethod struct {
connector *BigQueryConnector
gcsBucket string
connector *BigQueryConnector
gcsBucket string
flowJobName string
}

func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string) *QRepAvroSyncMethod {
func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
flowJobName string) *QRepAvroSyncMethod {
return &QRepAvroSyncMethod{
connector: connector,
gcsBucket: gcsBucket,
connector: connector,
gcsBucket: gcsBucket,
flowJobName: flowJobName,
}
}

Expand Down Expand Up @@ -325,58 +328,59 @@ func (s *QRepAvroSyncMethod) writeToStage(
shutdown <- true
}()

var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
return numRecords, err
} else {
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}
var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("writing records to local file %s", avroFilePath)
numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
return numRecords, err
numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
avroFile = &avro.AvroFile{
NumRecords: numRecords,
StorageLocation: avro.AvroGCSStorage,
FilePath: avroFilePath,
}
} else {
tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), s.flowJobName)
err := os.MkdirAll(tmpDir, os.ModePerm)
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("writing records to local file %s", avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
}()
if err != nil {
return 0, err
}
if numRecords == 0 {
defer avroFile.Cleanup()

if avroFile.NumRecords == 0 {
return 0, nil
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)
}).Infof("wrote %d records to file %s", avroFile.NumRecords, avroFile.FilePath)

bqClient := s.connector.client
datasetID := s.connector.datasetID
var avroRef bigquery.LoadSource
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFile.FilePath))
gcsRef.SourceFormat = bigquery.Avro
gcsRef.Compression = bigquery.Deflate
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
fh, err := os.Open(avroFile.FilePath)
if err != nil {
return 0, fmt.Errorf("failed to read local Avro file: %w", err)
}
Expand All @@ -401,6 +405,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
avroFilePath, syncID)
return numRecords, nil
avroFile.FilePath, syncID)
return avroFile.NumRecords, nil
}
5 changes: 3 additions & 2 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ func (c *S3Connector) writeToAvroFile(

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
avroFile, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
}
defer avroFile.Cleanup()

return numRecords, nil
return avroFile.NumRecords, nil
}

// S3 just sets up destination, not metadata tables
Expand Down
66 changes: 27 additions & 39 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
}

partitionID := util.RandomString(16)
numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
}
defer avroFile.Cleanup()
log.WithFields(log.Fields{
"destinationTable": dstTableName,
"flowName": flowJobName,
}).Infof("written %d records to Avro file", numRecords)
}).Infof("written %d records to Avro file", avroFile.NumRecords)

stage := s.connector.getStageNameForJob(s.config.FlowJobName)
err = s.connector.createStage(stage, s.config)
Expand All @@ -85,7 +86,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
}

allCols := colInfo.Columns
err = s.putFileToStage(localFilePath, stage)
err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
Expand All @@ -101,7 +102,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
"destinationTable": dstTableName,
}).Infof("copying records into %s from stage %s", s.config.DestinationTableIdentifier, stage)

return numRecords, nil
return avroFile.NumRecords, nil
}

func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
Expand Down Expand Up @@ -138,28 +139,15 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
return 0, err
}

numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName)
if err != nil {
return 0, err
}

if localFilePath != "" {
defer func() {
log.Infof("removing temp file %s", localFilePath)
err := os.Remove(localFilePath)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partitionID": partition.PartitionId,
"destinationTable": dstTableName,
}).Errorf("failed to remove temp file %s: %v", localFilePath, err)
}
}()
}
defer avroFile.Cleanup()
Comment on lines 143 to +146
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be an error in the middle of writing the file and lead to a file still being left around?
in that case we'll return the error, but not cleanup.

Similar cases below


stage := s.connector.getStageNameForJob(config.FlowJobName)

err = s.putFileToStage(localFilePath, stage)
err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
Expand All @@ -175,7 +163,7 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(

activity.RecordHeartbeat(s.connector.ctx, "finished syncing records")

return numRecords, nil
return avroFile.NumRecords, nil
}

func (s *SnowflakeAvroSyncMethod) addMissingColumns(
Expand Down Expand Up @@ -271,73 +259,73 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(
avroSchema *model.QRecordAvroSchemaDefinition,
partitionID string,
flowJobName string,
) (int, string, error) {
var numRecords int
) (*avro.AvroFile, error) {
if s.config.StagingPath == "" {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeSnowflake)
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), flowJobName)
err := os.MkdirAll(tmpDir, os.ModePerm)
if err != nil {
return 0, "", fmt.Errorf("failed to create temp dir: %w", err)
return nil, fmt.Errorf("failed to create temp dir: %w", err)
}

localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Infof("writing records to local file %s", localFilePath)
numRecords, err = ocfWriter.WriteRecordsToAvroFile(localFilePath)
avroFile, err := ocfWriter.WriteRecordsToAvroFile(localFilePath)
if err != nil {
return 0, "", fmt.Errorf("failed to write records to Avro file: %w", err)
return nil, fmt.Errorf("failed to write records to Avro file: %w", err)
}

return numRecords, localFilePath, nil
return avroFile, nil
} else if strings.HasPrefix(s.config.StagingPath, "s3://") {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeSnowflake)
s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath)
if err != nil {
return 0, "", fmt.Errorf("failed to parse staging path: %w", err)
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Infof("OCF: Writing records to S3")
numRecords, err = ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{})
avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{})
if err != nil {
return 0, "", fmt.Errorf("failed to write records to S3: %w", err)
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}

return numRecords, "", nil
return avroFile, nil
}

return 0, "", fmt.Errorf("unsupported staging path: %s", s.config.StagingPath)
return nil, fmt.Errorf("unsupported staging path: %s", s.config.StagingPath)
}

func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage string) error {
if localFilePath == "" {
func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error {
if avroFile.StorageLocation != avro.AvroLocalStorage {
log.Infof("no file to put to stage")
return nil
}

activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", localFilePath, stage)
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

sutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})

defer func() {
sutdown <- true
shutdown <- true
}()

if _, err := s.connector.database.Exec(putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
}

log.Infof("put file %s to stage %s", localFilePath, stage)
log.Infof("put file %s to stage %s", avroFile.FilePath, stage)
return nil
}

Expand Down
Loading
Loading