Skip to content

Commit

Permalink
Merge branch 'main' into rip-grafana
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 23, 2023
2 parents 1b80c75 + 08e72c1 commit f111329
Show file tree
Hide file tree
Showing 20 changed files with 520 additions and 797 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
9 changes: 1 addition & 8 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,20 +1242,13 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl)

_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
// release the lock
_, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl)
if err != nil {
return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err)
}

err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
Expand Down
162 changes: 73 additions & 89 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package connbigquery

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)
Expand Down Expand Up @@ -44,13 +43,13 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}

stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID))
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -106,18 +105,20 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
startTime := time.Now()

// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Obtained Avro schema for destination table %s and partition ID %s",
dstTableName, partition.PartitionId)
fmt.Printf("Avro schema: %s\n", avroSchema)
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Avro schema: %v\n", avroSchema)
// create a staging table name with partitionID replace hyphens with underscores
stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_"))
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -182,14 +183,15 @@ type AvroSchema struct {
Fields []AvroField `json:"fields"`
}

func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata) (string, map[string]bool, error) {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
if err != nil {
return "", nil, err
return nil, err
}

// If a field is nullable, its Avro type should be ["null", actualType]
Expand All @@ -212,10 +214,13 @@ func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetad

avroSchemaJSON, err := json.Marshal(avroSchema)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
}

return string(avroSchemaJSON), nullableFields, nil
return &model.QRecordAvroSchemaDefinition{
Schema: string(avroSchemaJSON),
NullableFields: nullableFields,
}, nil
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
Expand Down Expand Up @@ -306,10 +311,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
func (s *QRepAvroSyncMethod) writeToStage(
syncID string,
objectFolder string,
avroSchema string,
avroSchema *model.QRecordAvroSchemaDefinition,
stagingTable string,
stream *model.QRecordStream,
nullable map[string]bool,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
func() string {
Expand All @@ -320,95 +324,75 @@ func (s *QRepAvroSyncMethod) writeToStage(
defer func() {
shutdown <- true
}()
ctx := context.Background()
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
gcsObjectName := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)

obj := bucket.Object(gcsObjectName)
w := obj.NewWriter(ctx)

// Create OCF Writer
var ocfFileContents bytes.Buffer
ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &ocfFileContents,
Schema: avroSchema,
})
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}

schema, err := stream.Schema()
if err != nil {
log.WithFields(log.Fields{
"partitonOrBatchID": syncID,
}).Errorf("failed to get schema from stream: %v", err)
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}
var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
return numRecords, err
} else {
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Obtained staging bucket %s and schema of rows. Now writing records to OCF file.",
gcsObjectName),
)
numRecords := 0
// Write each QRecord to the OCF file
for qRecordOrErr := range stream.Records {
if numRecords > 0 && numRecords%10000 == 0 {
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Written %d records to OCF file for staging bucket %s.",
numRecords, gcsObjectName),
)
}
if qRecordOrErr.Err != nil {
avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Errorf("[bq_avro] failed to get record from stream: %v", qRecordOrErr.Err)
return 0, fmt.Errorf("[bq_avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qvalue.QDWHTypeBigQuery,
&nullable,
schema.GetColumnNames(),
)
avroMap, err := avroConverter.Convert()
if err != nil {
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
}

err = ocfWriter.Append([]interface{}{avroMap})
if err != nil {
return 0, fmt.Errorf("failed to write record to OCF file: %w", err)
}).Infof("writing records to local file %s", avroFilePath)
numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
return numRecords, err
}
numRecords++
}
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Writing OCF contents to BigQuery for partition/batch ID %s",
syncID),
)
// Write OCF contents to GCS
if _, err = w.Write(ocfFileContents.Bytes()); err != nil {
return 0, fmt.Errorf("failed to write OCF file to GCS: %w", err)
}()
if err != nil {
return 0, err
}

if err := w.Close(); err != nil {
return 0, fmt.Errorf("failed to close GCS object writer: %w", err)
if numRecords == 0 {
return 0, nil
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)

// write this file to bigquery
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, gcsObjectName))
gcsRef.SourceFormat = bigquery.Avro
bqClient := s.connector.client
datasetID := s.connector.datasetID
loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(gcsRef)
var avroRef bigquery.LoadSource
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef.SourceFormat = bigquery.Avro
gcsRef.Compression = bigquery.Deflate
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to read local Avro file: %w", err)
}
localRef := bigquery.NewReaderSource(fh)
localRef.SourceFormat = bigquery.Avro
avroRef = localRef
}

loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
job, err := loader.Run(ctx)
job, err := loader.Run(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to run BigQuery load job: %w", err)
}

status, err := job.Wait(ctx)
status, err := job.Wait(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err)
}
Expand All @@ -417,6 +401,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
gcsObjectName, syncID)
avroFilePath, syncID)
return numRecords, nil
}
3 changes: 2 additions & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -62,7 +63,7 @@ func (c *S3Connector) writeToAvroFile(
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
Expand Down
Loading

0 comments on commit f111329

Please sign in to comment.