Skip to content

Commit

Permalink
test fixing with a side of BigQuery features
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 16, 2023
1 parent e9c028d commit 28b155e
Show file tree
Hide file tree
Showing 22 changed files with 361 additions and 541 deletions.
9 changes: 1 addition & 8 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,20 +1242,13 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl)

_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
// release the lock
_, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl)
if err != nil {
return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err)
}

err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
Expand Down
158 changes: 69 additions & 89 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package connbigquery

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)
Expand Down Expand Up @@ -44,13 +43,13 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}

stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID))
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -106,18 +105,20 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
startTime := time.Now()

// You will need to define your Avro schema as a string
avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Obtained Avro schema for destination table %s and partition ID %s",
dstTableName, partition.PartitionId)
fmt.Printf("Avro schema: %s\n", avroSchema)
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("Avro schema: %v\n", avroSchema)
// create a staging table name with partitionID replace hyphens with underscores
stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_"))
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream, nullable)
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -182,14 +183,15 @@ type AvroSchema struct {
Fields []AvroField `json:"fields"`
}

func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata) (string, map[string]bool, error) {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
if err != nil {
return "", nil, err
return nil, err
}

// If a field is nullable, its Avro type should be ["null", actualType]
Expand All @@ -212,10 +214,13 @@ func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetad

avroSchemaJSON, err := json.Marshal(avroSchema)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err)
}

return string(avroSchemaJSON), nullableFields, nil
return &model.QRecordAvroSchemaDefinition{
Schema: string(avroSchemaJSON),
NullableFields: nullableFields,
}, nil
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
Expand Down Expand Up @@ -306,10 +311,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
func (s *QRepAvroSyncMethod) writeToStage(
syncID string,
objectFolder string,
avroSchema string,
avroSchema *model.QRecordAvroSchemaDefinition,
stagingTable string,
stream *model.QRecordStream,
nullable map[string]bool,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
func() string {
Expand All @@ -320,95 +324,71 @@ func (s *QRepAvroSyncMethod) writeToStage(
defer func() {
shutdown <- true
}()
ctx := context.Background()
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
gcsObjectName := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)

obj := bucket.Object(gcsObjectName)
w := obj.NewWriter(ctx)

// Create OCF Writer
var ocfFileContents bytes.Buffer
ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &ocfFileContents,
Schema: avroSchema,
})
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}

schema, err := stream.Schema()
if err != nil {
log.WithFields(log.Fields{
"partitonOrBatchID": syncID,
}).Errorf("failed to get schema from stream: %v", err)
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}
var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressSnappy, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

numRecords, err := ocfWriter.WriteOCF(w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
return numRecords, err
} else {
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Obtained staging bucket %s and schema of rows. Now writing records to OCF file.",
gcsObjectName),
)
numRecords := 0
// Write each QRecord to the OCF file
for qRecordOrErr := range stream.Records {
if numRecords > 0 && numRecords%10000 == 0 {
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Written %d records to OCF file for staging bucket %s.",
numRecords, gcsObjectName),
)
}
if qRecordOrErr.Err != nil {
avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Errorf("[bq_avro] failed to get record from stream: %v", qRecordOrErr.Err)
return 0, fmt.Errorf("[bq_avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qvalue.QDWHTypeBigQuery,
&nullable,
schema.GetColumnNames(),
)
avroMap, err := avroConverter.Convert()
if err != nil {
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
}).Infof("writing records to local file %s", avroFilePath)
numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
return numRecords, err
}
}()
if err != nil {
return 0, err
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)

err = ocfWriter.Append([]interface{}{avroMap})
bqClient := s.connector.client
datasetID := s.connector.datasetID
var avroRef bigquery.LoadSource
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef.SourceFormat = bigquery.Avro
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write record to OCF file: %w", err)
return 0, fmt.Errorf("failed to read local Avro file: %w", err)
}
numRecords++
}
activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf(
"Writing OCF contents to BigQuery for partition/batch ID %s",
syncID),
)
// Write OCF contents to GCS
if _, err = w.Write(ocfFileContents.Bytes()); err != nil {
return 0, fmt.Errorf("failed to write OCF file to GCS: %w", err)
localRef := bigquery.NewReaderSource(fh)
localRef.SourceFormat = bigquery.Avro
avroRef = localRef
}

if err := w.Close(); err != nil {
return 0, fmt.Errorf("failed to close GCS object writer: %w", err)
}

// write this file to bigquery
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, gcsObjectName))
gcsRef.SourceFormat = bigquery.Avro
bqClient := s.connector.client
datasetID := s.connector.datasetID
loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(gcsRef)
loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
job, err := loader.Run(ctx)
job, err := loader.Run(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to run BigQuery load job: %w", err)
}

status, err := job.Wait(ctx)
status, err := job.Wait(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err)
}
Expand All @@ -417,6 +397,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
gcsObjectName, syncID)
avroFilePath, syncID)
return numRecords, nil
}
3 changes: 2 additions & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -62,7 +63,7 @@ func (c *S3Connector) writeToAvroFile(
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
Expand Down
66 changes: 63 additions & 3 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connsnowflake

import (
"context"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -142,7 +143,64 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) {
fmt.Printf("[test] avroSchema: %v\n", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema)
writer := avro.NewPeerDBOCFWriter(context.Background(),
records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
_, err = writer.WriteRecordsToAvroFile(tmpfile.Name())
require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors")

// Check file is not empty
info, err := tmpfile.Stat()
require.NoError(t, err)
require.NotZero(t, info.Size(), "expected file to not be empty")
}

func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) {
// Create temporary file
tmpfile, err := os.CreateTemp("", "example_*.avro.zst")
require.NoError(t, err)

defer os.Remove(tmpfile.Name()) // clean up
defer tmpfile.Close() // close file after test ends

// Define sample data
records, schema := generateRecords(t, true, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
records, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake)
_, err = writer.WriteRecordsToAvroFile(tmpfile.Name())
require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors")

// Check file is not empty
info, err := tmpfile.Stat()
require.NoError(t, err)
require.NotZero(t, info.Size(), "expected file to not be empty")
}

func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) {
// Create temporary file
tmpfile, err := os.CreateTemp("", "example_*.avro.zz")
require.NoError(t, err)

defer os.Remove(tmpfile.Name()) // clean up
defer tmpfile.Close() // close file after test ends

// Define sample data
records, schema := generateRecords(t, true, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
require.NoError(t, err)

fmt.Printf("[test] avroSchema: %v\n", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(context.Background(),
records, avroSchema, avro.CompressDeflate, qvalue.QDWHTypeSnowflake)
_, err = writer.WriteRecordsToAvroFile(tmpfile.Name())
require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors")

Expand All @@ -168,7 +226,8 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) {
fmt.Printf("[test] avroSchema: %v\n", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema)
writer := avro.NewPeerDBOCFWriter(context.Background(),
records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
_, err = writer.WriteRecordsToAvroFile(tmpfile.Name())
require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors")

Expand All @@ -195,7 +254,8 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) {
fmt.Printf("[test] avroSchema: %v\n", avroSchema)

// Call function
writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema)
writer := avro.NewPeerDBOCFWriter(context.Background(),
records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake)
_, err = writer.WriteRecordsToAvroFile(tmpfile.Name())
require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors")

Expand Down
Loading

0 comments on commit 28b155e

Please sign in to comment.