Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test fixing with a side of BigQuery features #673

Merged
merged 13 commits into from
Nov 23, 2023
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
9 changes: 1 addition & 8 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,20 +1242,13 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl)

_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
// release the lock
_, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl)
if err != nil {
return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err)
}

err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
Expand Down
162 changes: 73 additions & 89 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package connbigquery

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)
Expand Down Expand Up @@ -44,13 +43,13 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}

stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID))
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -106,18 +105,20 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
startTime := time.Now()

// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Obtained Avro schema for destination table %s and partition ID %s",
dstTableName, partition.PartitionId)
fmt.Printf("Avro schema: %s\n", avroSchema)
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Avro schema: %v\n", avroSchema)
// create a staging table name with partitionID replace hyphens with underscores
stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_"))
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -182,14 +183,15 @@ type AvroSchema struct {
Fields []AvroField `json:"fields"`
}

func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata) (string, map[string]bool, error) {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
if err != nil {
return "", nil, err
return nil, err
}

// If a field is nullable, its Avro type should be ["null", actualType]
Expand All @@ -212,10 +214,13 @@ func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetad

avroSchemaJSON, err := json.Marshal(avroSchema)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
}

return string(avroSchemaJSON), nullableFields, nil
return &model.QRecordAvroSchemaDefinition{
Schema: string(avroSchemaJSON),
NullableFields: nullableFields,
}, nil
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
Expand Down Expand Up @@ -306,10 +311,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
func (s *QRepAvroSyncMethod) writeToStage(
syncID string,
objectFolder string,
avroSchema string,
avroSchema *model.QRecordAvroSchemaDefinition,
stagingTable string,
stream *model.QRecordStream,
nullable map[string]bool,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
func() string {
Expand All @@ -320,95 +324,75 @@ func (s *QRepAvroSyncMethod) writeToStage(
defer func() {
shutdown <- true
}()
ctx := context.Background()
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
gcsObjectName := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)

obj := bucket.Object(gcsObjectName)
w := obj.NewWriter(ctx)

// Create OCF Writer
var ocfFileContents bytes.Buffer
ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &ocfFileContents,
Schema: avroSchema,
})
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}

schema, err := stream.Schema()
if err != nil {
log.WithFields(log.Fields{
"partitonOrBatchID": syncID,
}).Errorf("failed to get schema from stream: %v", err)
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}
var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
return numRecords, err
} else {
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Obtained staging bucket %s and schema of rows. Now writing records to OCF file.",
gcsObjectName),
)
numRecords := 0
// Write each QRecord to the OCF file
for qRecordOrErr := range stream.Records {
if numRecords > 0 && numRecords%10000 == 0 {
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Written %d records to OCF file for staging bucket %s.",
numRecords, gcsObjectName),
)
}
if qRecordOrErr.Err != nil {
avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Errorf("[bq_avro] failed to get record from stream: %v", qRecordOrErr.Err)
return 0, fmt.Errorf("[bq_avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qvalue.QDWHTypeBigQuery,
&nullable,
schema.GetColumnNames(),
)
avroMap, err := avroConverter.Convert()
if err != nil {
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
}

err = ocfWriter.Append([]interface{}{avroMap})
if err != nil {
return 0, fmt.Errorf("failed to write record to OCF file: %w", err)
}).Infof("writing records to local file %s", avroFilePath)
numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
return numRecords, err
}
numRecords++
}
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Writing OCF contents to BigQuery for partition/batch ID %s",
syncID),
)
// Write OCF contents to GCS
if _, err = w.Write(ocfFileContents.Bytes()); err != nil {
return 0, fmt.Errorf("failed to write OCF file to GCS: %w", err)
}()
if err != nil {
return 0, err
}

if err := w.Close(); err != nil {
return 0, fmt.Errorf("failed to close GCS object writer: %w", err)
if numRecords == 0 {
return 0, nil
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)

// write this file to bigquery
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, gcsObjectName))
gcsRef.SourceFormat = bigquery.Avro
bqClient := s.connector.client
datasetID := s.connector.datasetID
loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(gcsRef)
var avroRef bigquery.LoadSource
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef.SourceFormat = bigquery.Avro
gcsRef.Compression = bigquery.Deflate
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to read local Avro file: %w", err)
}
localRef := bigquery.NewReaderSource(fh)
localRef.SourceFormat = bigquery.Avro
avroRef = localRef
}

loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
job, err := loader.Run(ctx)
job, err := loader.Run(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to run BigQuery load job: %w", err)
}

status, err := job.Wait(ctx)
status, err := job.Wait(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err)
}
Expand All @@ -417,6 +401,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
gcsObjectName, syncID)
avroFilePath, syncID)
return numRecords, nil
}
3 changes: 2 additions & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -62,7 +63,7 @@ func (c *S3Connector) writeToAvroFile(
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
Expand Down
Loading
Loading