Skip to content

Commit

Permalink
Merge branch 'main' into remove-unused-code
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Dec 7, 2023
2 parents 3da8ac6 + 1a1d5a7 commit 990f9ba
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 93 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
}

close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
Expand Down
90 changes: 47 additions & 43 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
)

type QRepAvroSyncMethod struct {
connector *BigQueryConnector
gcsBucket string
connector *BigQueryConnector
gcsBucket string
flowJobName string
}

func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string) *QRepAvroSyncMethod {
func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
flowJobName string) *QRepAvroSyncMethod {
return &QRepAvroSyncMethod{
connector: connector,
gcsBucket: gcsBucket,
connector: connector,
gcsBucket: gcsBucket,
flowJobName: flowJobName,
}
}

Expand Down Expand Up @@ -325,58 +328,59 @@ func (s *QRepAvroSyncMethod) writeToStage(
shutdown <- true
}()

var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
return numRecords, err
} else {
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}
var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("writing records to local file %s", avroFilePath)
numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
return numRecords, err
numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
avroFile = &avro.AvroFile{
NumRecords: numRecords,
StorageLocation: avro.AvroGCSStorage,
FilePath: avroFilePath,
}
} else {
tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), s.flowJobName)
err := os.MkdirAll(tmpDir, os.ModePerm)
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("writing records to local file %s", avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
}()
if err != nil {
return 0, err
}
if numRecords == 0 {
defer avroFile.Cleanup()

if avroFile.NumRecords == 0 {
return 0, nil
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)
}).Infof("wrote %d records to file %s", avroFile.NumRecords, avroFile.FilePath)

bqClient := s.connector.client
datasetID := s.connector.datasetID
var avroRef bigquery.LoadSource
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFile.FilePath))
gcsRef.SourceFormat = bigquery.Avro
gcsRef.Compression = bigquery.Deflate
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
fh, err := os.Open(avroFile.FilePath)
if err != nil {
return 0, fmt.Errorf("failed to read local Avro file: %w", err)
}
Expand All @@ -401,6 +405,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
avroFilePath, syncID)
return numRecords, nil
avroFile.FilePath, syncID)
return avroFile.NumRecords, nil
}
4 changes: 4 additions & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists {
return nil, nil
}

// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ func (c *S3Connector) writeToAvroFile(

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
avroFile, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
}
defer avroFile.Cleanup()

return numRecords, nil
return avroFile.NumRecords, nil
}

// S3 just sets up destination, not metadata tables
Expand Down
66 changes: 27 additions & 39 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
}

partitionID := util.RandomString(16)
numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
}
defer avroFile.Cleanup()
log.WithFields(log.Fields{
"destinationTable": dstTableName,
"flowName": flowJobName,
}).Infof("written %d records to Avro file", numRecords)
}).Infof("written %d records to Avro file", avroFile.NumRecords)

stage := s.connector.getStageNameForJob(s.config.FlowJobName)
err = s.connector.createStage(stage, s.config)
Expand All @@ -85,7 +86,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
}

allCols := colInfo.Columns
err = s.putFileToStage(localFilePath, stage)
err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
Expand All @@ -101,7 +102,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
"destinationTable": dstTableName,
}).Infof("copying records into %s from stage %s", s.config.DestinationTableIdentifier, stage)

return numRecords, nil
return avroFile.NumRecords, nil
}

func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
Expand Down Expand Up @@ -138,28 +139,15 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
return 0, err
}

numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName)
if err != nil {
return 0, err
}

if localFilePath != "" {
defer func() {
log.Infof("removing temp file %s", localFilePath)
err := os.Remove(localFilePath)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partitionID": partition.PartitionId,
"destinationTable": dstTableName,
}).Errorf("failed to remove temp file %s: %v", localFilePath, err)
}
}()
}
defer avroFile.Cleanup()

stage := s.connector.getStageNameForJob(config.FlowJobName)

err = s.putFileToStage(localFilePath, stage)
err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
Expand All @@ -175,7 +163,7 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(

activity.RecordHeartbeat(s.connector.ctx, "finished syncing records")

return numRecords, nil
return avroFile.NumRecords, nil
}

func (s *SnowflakeAvroSyncMethod) addMissingColumns(
Expand Down Expand Up @@ -271,73 +259,73 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(
avroSchema *model.QRecordAvroSchemaDefinition,
partitionID string,
flowJobName string,
) (int, string, error) {
var numRecords int
) (*avro.AvroFile, error) {
if s.config.StagingPath == "" {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeSnowflake)
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
tmpDir := fmt.Sprintf("%s/peerdb-avro-%s", os.TempDir(), flowJobName)
err := os.MkdirAll(tmpDir, os.ModePerm)
if err != nil {
return 0, "", fmt.Errorf("failed to create temp dir: %w", err)
return nil, fmt.Errorf("failed to create temp dir: %w", err)
}

localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Infof("writing records to local file %s", localFilePath)
numRecords, err = ocfWriter.WriteRecordsToAvroFile(localFilePath)
avroFile, err := ocfWriter.WriteRecordsToAvroFile(localFilePath)
if err != nil {
return 0, "", fmt.Errorf("failed to write records to Avro file: %w", err)
return nil, fmt.Errorf("failed to write records to Avro file: %w", err)
}

return numRecords, localFilePath, nil
return avroFile, nil
} else if strings.HasPrefix(s.config.StagingPath, "s3://") {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeSnowflake)
s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath)
if err != nil {
return 0, "", fmt.Errorf("failed to parse staging path: %w", err)
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Infof("OCF: Writing records to S3")
numRecords, err = ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{})
avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{})
if err != nil {
return 0, "", fmt.Errorf("failed to write records to S3: %w", err)
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}

return numRecords, "", nil
return avroFile, nil
}

return 0, "", fmt.Errorf("unsupported staging path: %s", s.config.StagingPath)
return nil, fmt.Errorf("unsupported staging path: %s", s.config.StagingPath)
}

func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage string) error {
if localFilePath == "" {
func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error {
if avroFile.StorageLocation != avro.AvroLocalStorage {
log.Infof("no file to put to stage")
return nil
}

activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", localFilePath, stage)
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

sutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})

defer func() {
sutdown <- true
shutdown <- true
}()

if _, err := s.connector.database.Exec(putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
}

log.Infof("put file %s to stage %s", localFilePath, stage)
log.Infof("put file %s to stage %s", avroFile.FilePath, stage)
return nil
}

Expand Down
Loading

0 comments on commit 990f9ba

Please sign in to comment.