diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 776b6cc828..c216ac5233 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -1242,20 +1242,13 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { // grab an advisory lock based on the mirror jobs table hash mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl) - + _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) if err != nil { err = tx.Rollback(c.ctx) return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) } return func() error { - // release the lock - _, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl) - if err != nil { - return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err) - } - err = tx.Commit(c.ctx) if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 8cb8af79ce..7a44352fc0 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -1,19 +1,18 @@ package connbigquery import ( - "bytes" - "context" "encoding/json" "fmt" + "os" "strings" "time" "cloud.google.com/go/bigquery" "github.com/PeerDB-io/peer-flow/connectors/utils" + avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" ) @@ -44,13 +43,13 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) // You will need to define your Avro schema as a string - avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID)) - numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream, nullable) + numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -106,7 +105,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( startTime := time.Now() // You will need to define your Avro schema as a string - avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -114,10 +113,12 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( "flowName": flowJobName, }).Infof("Obtained Avro schema for destination table %s and partition ID %s", dstTableName, partition.PartitionId) - fmt.Printf("Avro schema: %s\n", avroSchema) + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("Avro schema: %v\n", avroSchema) // create a staging table name with partitionID replace hyphens with underscores stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_")) - numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream, nullable) + numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -182,14 +183,15 @@ type AvroSchema struct { Fields []AvroField `json:"fields"` } -func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata) (string, map[string]bool, error) { +func DefineAvroSchema(dstTableName string, + dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) { avroFields := []AvroField{} nullableFields := map[string]bool{} for _, bqField := range dstTableMetadata.Schema { avroType, err := GetAvroType(bqField) if err != nil { - return "", nil, err + return nil, err } // If a field is nullable, its Avro type should be ["null", actualType] @@ -212,10 +214,13 @@ func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetad avroSchemaJSON, err := json.Marshal(avroSchema) if err != nil { - return "", nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) + return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) } - return string(avroSchemaJSON), nullableFields, nil + return &model.QRecordAvroSchemaDefinition{ + Schema: string(avroSchemaJSON), + NullableFields: nullableFields, + }, nil } func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { @@ -306,10 +311,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { func (s *QRepAvroSyncMethod) writeToStage( syncID string, objectFolder string, - avroSchema string, + avroSchema *model.QRecordAvroSchemaDefinition, stagingTable string, stream *model.QRecordStream, - nullable map[string]bool, ) (int, error) { shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute, func() string { @@ -320,95 +324,71 @@ func (s *QRepAvroSyncMethod) writeToStage( defer func() { shutdown <- true }() - ctx := context.Background() - bucket := s.connector.storageClient.Bucket(s.gcsBucket) - gcsObjectName := fmt.Sprintf("%s/%s.avro", objectFolder, syncID) - - obj := bucket.Object(gcsObjectName) - w := obj.NewWriter(ctx) - - // Create OCF Writer - var ocfFileContents bytes.Buffer - ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: &ocfFileContents, - Schema: avroSchema, - }) - if err != nil { - return 0, fmt.Errorf("failed to create OCF writer: %w", err) - } - schema, err := stream.Schema() - if err != nil { - log.WithFields(log.Fields{ - "partitonOrBatchID": syncID, - }).Errorf("failed to get schema from stream: %v", err) - return 0, fmt.Errorf("failed to get schema from stream: %w", err) - } + var avroFilePath string + numRecords, err := func() (int, error) { + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, + avro.CompressSnappy, qvalue.QDWHTypeBigQuery) + if s.gcsBucket != "" { + bucket := s.connector.storageClient.Bucket(s.gcsBucket) + avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID) + obj := bucket.Object(avroFilePath) + w := obj.NewWriter(s.connector.ctx) + + numRecords, err := ocfWriter.WriteOCF(w) + if err != nil { + return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err) + } + return numRecords, err + } else { + tmpDir, err := os.MkdirTemp("", "peerdb-avro") + if err != nil { + return 0, fmt.Errorf("failed to create temp dir: %w", err) + } - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Obtained staging bucket %s and schema of rows. Now writing records to OCF file.", - gcsObjectName), - ) - numRecords := 0 - // Write each QRecord to the OCF file - for qRecordOrErr := range stream.Records { - if numRecords > 0 && numRecords%10000 == 0 { - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Written %d records to OCF file for staging bucket %s.", - numRecords, gcsObjectName), - ) - } - if qRecordOrErr.Err != nil { + avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", tmpDir, syncID) log.WithFields(log.Fields{ "batchOrPartitionID": syncID, - }).Errorf("[bq_avro] failed to get record from stream: %v", qRecordOrErr.Err) - return 0, fmt.Errorf("[bq_avro] failed to get record from stream: %w", qRecordOrErr.Err) - } - - qRecord := qRecordOrErr.Record - avroConverter := model.NewQRecordAvroConverter( - qRecord, - qvalue.QDWHTypeBigQuery, - &nullable, - schema.GetColumnNames(), - ) - avroMap, err := avroConverter.Convert() - if err != nil { - return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) + }).Infof("writing records to local file %s", avroFilePath) + numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath) + if err != nil { + return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) + } + return numRecords, err } + }() + if err != nil { + return 0, err + } + log.WithFields(log.Fields{ + "batchOrPartitionID": syncID, + }).Infof("wrote %d records to file %s", numRecords, avroFilePath) - err = ocfWriter.Append([]interface{}{avroMap}) + bqClient := s.connector.client + datasetID := s.connector.datasetID + var avroRef bigquery.LoadSource + if s.gcsBucket != "" { + gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath)) + gcsRef.SourceFormat = bigquery.Avro + avroRef = gcsRef + } else { + fh, err := os.Open(avroFilePath) if err != nil { - return 0, fmt.Errorf("failed to write record to OCF file: %w", err) + return 0, fmt.Errorf("failed to read local Avro file: %w", err) } - numRecords++ - } - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Writing OCF contents to BigQuery for partition/batch ID %s", - syncID), - ) - // Write OCF contents to GCS - if _, err = w.Write(ocfFileContents.Bytes()); err != nil { - return 0, fmt.Errorf("failed to write OCF file to GCS: %w", err) + localRef := bigquery.NewReaderSource(fh) + localRef.SourceFormat = bigquery.Avro + avroRef = localRef } - if err := w.Close(); err != nil { - return 0, fmt.Errorf("failed to close GCS object writer: %w", err) - } - - // write this file to bigquery - gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, gcsObjectName)) - gcsRef.SourceFormat = bigquery.Avro - bqClient := s.connector.client - datasetID := s.connector.datasetID - loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(gcsRef) + loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef) loader.UseAvroLogicalTypes = true - job, err := loader.Run(ctx) + job, err := loader.Run(s.connector.ctx) if err != nil { return 0, fmt.Errorf("failed to run BigQuery load job: %w", err) } - status, err := job.Wait(ctx) + status, err := job.Wait(s.connector.ctx) if err != nil { return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err) } @@ -417,6 +397,6 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } log.Printf("Pushed into %s/%s", - gcsObjectName, syncID) + avroFilePath, syncID) return numRecords, nil } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index b34f9a2cf3..1f1cf881da 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -7,6 +7,7 @@ import ( avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" log "github.com/sirupsen/logrus" ) @@ -62,7 +63,7 @@ func (c *S3Connector) writeToAvroFile( } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID) - writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema) + writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds) if err != nil { return 0, fmt.Errorf("failed to write records to S3: %w", err) diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 77310c45db..76b70f478f 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -1,6 +1,7 @@ package connsnowflake import ( + "context" "fmt" "math/big" "os" @@ -142,7 +143,64 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) + _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) + require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") + + // Check file is not empty + info, err := tmpfile.Stat() + require.NoError(t, err) + require.NotZero(t, info.Size(), "expected file to not be empty") +} + +func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) { + // Create temporary file + tmpfile, err := os.CreateTemp("", "example_*.avro.zst") + require.NoError(t, err) + + defer os.Remove(tmpfile.Name()) // clean up + defer tmpfile.Close() // close file after test ends + + // Define sample data + records, schema := generateRecords(t, true, 10, false) + + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + require.NoError(t, err) + + fmt.Printf("[test] avroSchema: %v\n", avroSchema) + + // Call function + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake) + _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) + require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") + + // Check file is not empty + info, err := tmpfile.Stat() + require.NoError(t, err) + require.NotZero(t, info.Size(), "expected file to not be empty") +} + +func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) { + // Create temporary file + tmpfile, err := os.CreateTemp("", "example_*.avro.zz") + require.NoError(t, err) + + defer os.Remove(tmpfile.Name()) // clean up + defer tmpfile.Close() // close file after test ends + + // Define sample data + records, schema := generateRecords(t, true, 10, false) + + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + require.NoError(t, err) + + fmt.Printf("[test] avroSchema: %v\n", avroSchema) + + // Call function + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressDeflate, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") @@ -168,7 +226,8 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") @@ -195,7 +254,8 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index e74d0fdb2f..4631be6461 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -274,7 +274,7 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( ) (int, string, error) { var numRecords int if s.config.StagingPath == "" { - ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema) + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake) tmpDir, err := os.MkdirTemp("", "peerdb-avro") if err != nil { return 0, "", fmt.Errorf("failed to create temp dir: %w", err) @@ -292,13 +292,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( return numRecords, localFilePath, nil } else if strings.HasPrefix(s.config.StagingPath, "s3://") { - ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, + qvalue.QDWHTypeSnowflake) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) if err != nil { return 0, "", fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, s.config.FlowJobName, partitionID) + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID) log.WithFields(log.Fields{ "flowName": flowJobName, "partitionID": partitionID, diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 0b4cf09d7e..36c8858aa4 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -13,56 +13,67 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/klauspost/compress/flate" + "github.com/klauspost/compress/snappy" "github.com/klauspost/compress/zstd" "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" uber_atomic "go.uber.org/atomic" ) +type AvroCompressionCodec int64 + +const ( + CompressNone AvroCompressionCodec = iota + CompressZstd + CompressDeflate + CompressSnappy +) + type PeerDBOCFWriter struct { - ctx context.Context - stream *model.QRecordStream - avroSchema *model.QRecordAvroSchemaDefinition - compress bool - writer io.WriteCloser + ctx context.Context + stream *model.QRecordStream + avroSchema *model.QRecordAvroSchemaDefinition + avroCompressionCodec AvroCompressionCodec + writer io.WriteCloser + targetDWH qvalue.QDWHType } func NewPeerDBOCFWriter( ctx context.Context, stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, + avroCompressionCodec AvroCompressionCodec, + targetDWH qvalue.QDWHType, ) *PeerDBOCFWriter { return &PeerDBOCFWriter{ - ctx: ctx, - stream: stream, - avroSchema: avroSchema, - compress: false, - } -} - -func NewPeerDBOCFWriterWithCompression( - ctx context.Context, - stream *model.QRecordStream, - avroSchema *model.QRecordAvroSchemaDefinition, -) *PeerDBOCFWriter { - return &PeerDBOCFWriter{ - ctx: ctx, - stream: stream, - avroSchema: avroSchema, - compress: true, + ctx: ctx, + stream: stream, + avroSchema: avroSchema, + avroCompressionCodec: avroCompressionCodec, + targetDWH: targetDWH, } } func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error { var err error - if p.compress { + switch p.avroCompressionCodec { + case CompressNone: + p.writer = &nopWriteCloser{w} + case CompressZstd: p.writer, err = zstd.NewWriter(w) if err != nil { return fmt.Errorf("error while initializing zstd encoding writer: %w", err) } - } else { - p.writer = &nopWriteCloser{w} + case CompressDeflate: + p.writer, err = flate.NewWriter(w, -1) + if err != nil { + return fmt.Errorf("error while initializing deflate encoding writer: %w", err) + } + case CompressSnappy: + p.writer = snappy.NewBufferedWriter(w) } + return nil } @@ -115,7 +126,7 @@ func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( qRecord := qRecordOrErr.Record avroConverter := model.NewQRecordAvroConverter( qRecord, - qvalue.QDWHTypeSnowflake, + p.targetDWH, &p.avroSchema.NullableFields, colNames, ) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index be7f45ef4a..8d519b4f99 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -46,8 +46,6 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, {"Test_Types_BQ", s.Test_Types_BQ}, - {"Test_Types_Avro_BQ", s.Test_Types_Avro_BQ}, - {"Test_Simple_Flow_BQ_Avro_CDC", s.Test_Simple_Flow_BQ_Avro_CDC}, {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, @@ -65,6 +63,10 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { t.Run(tt.name, tt.test) } + + t.Cleanup(func() { + s.TearDownSuite() + }) } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -185,6 +187,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -230,6 +234,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -278,6 +284,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -347,13 +355,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -416,13 +426,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -478,6 +490,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -552,13 +566,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -621,13 +637,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -689,85 +707,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - - TotalSyncFlows: 1, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - /* test inserting various types*/ - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s SELECT 2,2,b'1',b'101', - true,random_bytea(32),'s','test','1.1.10.2'::cidr, - CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, - '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, - '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, - 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, - txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), - ARRAY[10299301,2579827], - ARRAY[0.0003, 8902.0092], - ARRAY['hello','bye']; - `, srcTableName)) - require.NoError(t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", - "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", - "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"}) - if err != nil { - fmt.Println("error %w", err) - } - // Make sure that there are no nulls - s.True(noNulls) - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_types_avro_bq") - dstTableName := "test_types_avro_bq" - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, - c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, - c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, - c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, - c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - `, srcTableName)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_types_avro_bq"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, CdcStagingPath: "peerdb_staging", } @@ -777,7 +716,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -796,8 +735,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), - ARRAY[9301,239827], - ARRAY[0.0003, 1039.0034], + ARRAY[10299301,2579827], + ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) require.NoError(t, err) @@ -826,72 +765,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_simple_flow_bq_avro_cdc") - dstTableName := "test_simple_flow_bq_avro_cdc" - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(t, err) - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow_bq_avro_cdc"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, - } - - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - for i := 0; i < 10; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(t, err) - } - fmt.Println("Inserted 10 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - count, err := s.bqHelper.countRows(dstTableName) - require.NoError(t, err) - s.Equal(10, count) - - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - - env.AssertExpectations(s.T()) -} - func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { t.Parallel() env := s.NewTestWorkflowEnvironment() @@ -913,13 +786,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -975,6 +850,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1079,6 +956,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1154,6 +1033,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1232,6 +1113,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "peerdb_staging", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 8bd4b6135f..9183762cde 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -68,7 +68,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { s.bqHelper.Peer, "peerdb_staging") s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -81,43 +81,3 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env.AssertExpectations(s.T()) } - -// NOTE: Disabled due to large JSON tests being added: https://github.com/PeerDB-io/peerdb/issues/309 - -// Test_Complete_QRep_Flow tests a complete flow with data in the source table. -// The test inserts 10 rows into the source table and verifies that the data is -// // correctly synced to the destination table this runs a QRep Flow. -// func (s *E2EPeerFlowTestSuite) Test_Complete_QRep_Flow_Multi_Insert() { -// env := s.NewTestWorkflowEnvironment() -// registerWorkflowsAndActivities(env) - -// numRows := 10 - -// tblName := "test_qrep_flow_multi_insert" -// s.setupSourceTable(tblName, numRows) -// s.setupBQDestinationTable(tblName) - -// query := fmt.Sprintf("SELECT * FROM e2e_test.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", tblName) - -// qrepConfig := s.createQRepWorkflowConfig("test_qrep_flow_mi", -// "e2e_test."+tblName, -// tblName, -// query, -// protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT, -// s.bqHelper.Peer) -// runQrepFlowWorkflow(env, qrepConfig) - -// // Verify workflow completes without error -// s.True(env.IsWorkflowCompleted()) - -// // assert that error contains "invalid connection configs" -// err := env.GetWorkflowError() -// s.NoError(err) - -// count, err := s.bqHelper.CountRows(tblName) -// s.NoError(err) - -// s.Equal(numRows, count) - -// env.AssertExpectations(s.T()) -// } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index df1653b992..52386711cf 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -171,7 +171,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { ) s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 2fca18a700..c3845f5f16 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -164,7 +164,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { qrepConfig.InitialCopyOnly = true qrepConfig.WatermarkColumn = "ctid" - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 13efa18e75..219f37c493 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -41,7 +41,6 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { test func(t *testing.T) }{ {"Test_Complete_Simple_Flow_SF", s.Test_Complete_Simple_Flow_SF}, - {"Test_Complete_Simple_Flow_SF_Avro_CDC", s.Test_Complete_Simple_Flow_SF_Avro_CDC}, {"Test_Invalid_Geo_SF_Avro_CDC", s.Test_Invalid_Geo_SF_Avro_CDC}, {"Test_Toast_SF", s.Test_Toast_SF}, {"Test_Toast_Nochanges_SF", s.Test_Toast_Nochanges_SF}, @@ -49,7 +48,6 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { {"Test_Toast_Advance_2_SF", s.Test_Toast_Advance_2_SF}, {"Test_Toast_Advance_3_SF", s.Test_Toast_Advance_3_SF}, {"Test_Types_SF", s.Test_Types_SF}, - {"Test_Types_SF_Avro_CDC", s.Test_Types_SF_Avro_CDC}, {"Test_Multi_Table_SF", s.Test_Multi_Table_SF}, {"Test_Simple_Schema_Changes_SF", s.Test_Simple_Schema_Changes_SF}, {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, @@ -171,6 +169,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -182,11 +181,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table + // and then insert 15 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table - for i := 0; i < 10; i++ { + // insert 15 rows into the source table + for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -209,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { count, err := s.sfHelper.CountRows("test_simple_flow_sf") require.NoError(t, err) - s.Equal(10, count) + s.Equal(20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -218,81 +217,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(t, err) - s.Equal(10, numNewRows) - - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { - t.Parallel() - - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - tblConst := "test_simple_flow_sf_avro_cdc" - srcTableName := s.attachSchemaSuffix(tblConst) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblConst) - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow_avro"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table - for i := 0; i < 15; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(t, err) - } - fmt.Println("Inserted 15 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - count, err := s.sfHelper.CountRows(tblConst) - require.NoError(t, err) - s.Equal(15, count) + s.Equal(20, numNewRows) // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side + // on the Snowflake side env.AssertExpectations(s.T()) } @@ -410,13 +338,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -482,13 +411,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -546,6 +476,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -622,13 +553,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -693,13 +625,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -759,86 +692,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { `, srcTableName, srcTableName)) require.NoError(t, err) - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_types_sf"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.sfHelper.Peer, - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - /* test inserting various types*/ - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s SELECT 2,2,b'1',b'101', - true,random_bytea(32),'s','test','1.1.10.2'::cidr, - CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, - '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, - '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, - 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, - txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), - 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', - 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', - 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; - `, srcTableName)) - require.NoError(t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", - "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", - "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) - if err != nil { - fmt.Println("error %w", err) - } - // Make sure that there are no nulls - s.Equal(noNulls, true) - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { - t.Parallel() - - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_types_sf_avro_cdc") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf_avro_cdc") - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, - c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, - c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, - c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, - c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), - c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - `, srcTableName)) - require.NoError(t, err) - connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -851,7 +704,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -887,7 +740,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { s.Error(err) s.Contains(err.Error(), "continue as new") - noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", + noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) @@ -922,13 +775,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -983,6 +837,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1149,6 +1004,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1226,6 +1082,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() @@ -1305,6 +1162,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 82901beac2..c1516ca8df 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -81,7 +81,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { ) s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -126,7 +126,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.NoError(err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -212,7 +212,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { qrepConfig.WatermarkColumn = "xmin" s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -256,7 +256,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( s.NoError(err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 9c2b27bcb0..6a70377ac4 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append WaitBetweenBatchesSeconds: 5, } - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index f26afc2ee1..d307039cdd 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -294,14 +294,21 @@ func CreateQRepWorkflowConfig( return nil, err } - qrepConfig.InitialCopyOnly = true - return qrepConfig, nil } -func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { - state := peerflow.NewQRepFlowState() +func RunQrepFlowWorkflow(suite testsuite.WorkflowTestSuite, config *protos.QRepConfig) bool { + env := suite.NewTestWorkflowEnvironment() + RegisterWorkflowsAndActivities(env) + state := peerflow.NewQRepFlowStateForTesting() + env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) + if !env.IsWorkflowCompleted() { + return false + } + env = suite.NewTestWorkflowEnvironment() + RegisterWorkflowsAndActivities(env) env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) + return env.IsWorkflowCompleted() } func GetOwnersSchema() *model.QRecordSchema { diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 0b73e4393f..027d899a28 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2627,8 +2627,8 @@ type QRepConfig struct { // This is only used when sync_mode is AVRO // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS - // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk, only supported in Snowflake + // if this starts with s3:// then it will be written to S3, only supported in Snowflake + // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. StagingPath string `protobuf:"bytes,15,opt,name=staging_path,json=stagingPath,proto3" json:"staging_path,omitempty"` // This setting overrides batch_size_int and batch_duration_seconds @@ -3193,6 +3193,7 @@ type QRepFlowState struct { LastPartition *QRepPartition `protobuf:"bytes,1,opt,name=last_partition,json=lastPartition,proto3" json:"last_partition,omitempty"` NumPartitionsProcessed uint64 `protobuf:"varint,2,opt,name=num_partitions_processed,json=numPartitionsProcessed,proto3" json:"num_partitions_processed,omitempty"` NeedsResync bool `protobuf:"varint,3,opt,name=needs_resync,json=needsResync,proto3" json:"needs_resync,omitempty"` + DisableWaitForNewRows bool `protobuf:"varint,4,opt,name=disable_wait_for_new_rows,json=disableWaitForNewRows,proto3" json:"disable_wait_for_new_rows,omitempty"` } func (x *QRepFlowState) Reset() { @@ -3248,6 +3249,13 @@ func (x *QRepFlowState) GetNeedsResync() bool { return false } +func (x *QRepFlowState) GetDisableWaitForNewRows() bool { + if x != nil { + return x.DisableWaitForNewRows + } + return false +} + var File_flow_proto protoreflect.FileDescriptor var file_flow_proto_rawDesc = []byte{ @@ -3894,7 +3902,7 @@ var file_flow_proto_rawDesc = []byte{ 0x64, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, - 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xaf, + 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x41, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, @@ -3906,26 +3914,30 @@ var file_flow_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, - 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, - 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, - 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, - 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, - 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, - 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, - 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, - 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, - 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, - 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, - 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, - 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, - 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, - 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, - 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, - 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, - 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x38, 0x0a, 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, + 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, + 0x46, 0x6f, 0x72, 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, + 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, + 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, + 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, + 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, + 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, + 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, + 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, + 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, + 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, + 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, + 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, + 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, + 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, + 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/flow/go.mod b/flow/go.mod index b31c7efe2a..b9f1e4e634 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -19,7 +19,7 @@ require ( github.com/jackc/pgx/v5 v5.5.0 github.com/jmoiron/sqlx v1.3.5 github.com/joho/godotenv v1.5.1 - github.com/klauspost/compress v1.17.2 + github.com/klauspost/compress v1.17.3 github.com/lib/pq v1.10.9 github.com/linkedin/goavro/v2 v2.12.0 github.com/microsoft/go-mssqldb v1.6.0 diff --git a/flow/go.sum b/flow/go.sum index 47c9799543..fab59374f2 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -289,6 +289,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index f20d17951e..3b8e77a686 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -34,7 +34,7 @@ type QRepPartitionFlowExecution struct { runUUID string } -// returns a new empty PeerFlowState +// returns a new empty QRepFlowState func NewQRepFlowState() *protos.QRepFlowState { return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ @@ -46,6 +46,19 @@ func NewQRepFlowState() *protos.QRepFlowState { } } +// returns a new empty QRepFlowState +func NewQRepFlowStateForTesting() *protos.QRepFlowState { + return &protos.QRepFlowState{ + LastPartition: &protos.QRepPartition{ + PartitionId: "not-applicable-partition", + Range: nil, + }, + NumPartitionsProcessed: 0, + NeedsResync: true, + DisableWaitForNewRows: true, + } +} + // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ @@ -440,10 +453,12 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - // sleep for a while and continue the workflow - err = q.waitForNewRows(ctx, state.LastPartition) - if err != nil { - return err + if !state.DisableWaitForNewRows { + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, state.LastPartition) + if err != nil { + return err + } } workflow.GetLogger(ctx).Info("Continuing as new workflow", diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 9fab82f84c..b4375725e3 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -449,8 +449,8 @@ pub struct QRepConfig { /// This is only used when sync_mode is AVRO /// this is the location where the avro files will be written /// if this starts with gs:// then it will be written to GCS - /// if this starts with s3:// then it will be written to S3 - /// if nothing is specified then it will be written to local disk, only supported in Snowflake + /// if this starts with s3:// then it will be written to S3, only supported in Snowflake + /// if nothing is specified then it will be written to local disk /// if using GCS or S3 make sure your instance has the correct permissions. #[prost(string, tag="15")] pub staging_path: ::prost::alloc::string::String, @@ -532,6 +532,8 @@ pub struct QRepFlowState { pub num_partitions_processed: u64, #[prost(bool, tag="3")] pub needs_resync: bool, + #[prost(bool, tag="4")] + pub disable_wait_for_new_rows: bool, } /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index 5dc8fa2ade..0df0f61f0b 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -3004,6 +3004,9 @@ impl serde::Serialize for QRepFlowState { if self.needs_resync { len += 1; } + if self.disable_wait_for_new_rows { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepFlowState", len)?; if let Some(v) = self.last_partition.as_ref() { struct_ser.serialize_field("lastPartition", v)?; @@ -3014,6 +3017,9 @@ impl serde::Serialize for QRepFlowState { if self.needs_resync { struct_ser.serialize_field("needsResync", &self.needs_resync)?; } + if self.disable_wait_for_new_rows { + struct_ser.serialize_field("disableWaitForNewRows", &self.disable_wait_for_new_rows)?; + } struct_ser.end() } } @@ -3030,6 +3036,8 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { "numPartitionsProcessed", "needs_resync", "needsResync", + "disable_wait_for_new_rows", + "disableWaitForNewRows", ]; #[allow(clippy::enum_variant_names)] @@ -3037,6 +3045,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { LastPartition, NumPartitionsProcessed, NeedsResync, + DisableWaitForNewRows, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -3062,6 +3071,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { "lastPartition" | "last_partition" => Ok(GeneratedField::LastPartition), "numPartitionsProcessed" | "num_partitions_processed" => Ok(GeneratedField::NumPartitionsProcessed), "needsResync" | "needs_resync" => Ok(GeneratedField::NeedsResync), + "disableWaitForNewRows" | "disable_wait_for_new_rows" => Ok(GeneratedField::DisableWaitForNewRows), _ => Ok(GeneratedField::__SkipField__), } } @@ -3084,6 +3094,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { let mut last_partition__ = None; let mut num_partitions_processed__ = None; let mut needs_resync__ = None; + let mut disable_wait_for_new_rows__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::LastPartition => { @@ -3106,6 +3117,12 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { } needs_resync__ = Some(map.next_value()?); } + GeneratedField::DisableWaitForNewRows => { + if disable_wait_for_new_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("disableWaitForNewRows")); + } + disable_wait_for_new_rows__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -3115,6 +3132,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { last_partition: last_partition__, num_partitions_processed: num_partitions_processed__.unwrap_or_default(), needs_resync: needs_resync__.unwrap_or_default(), + disable_wait_for_new_rows: disable_wait_for_new_rows__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 19a804e05c..5444080384 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -303,8 +303,8 @@ message QRepConfig { // This is only used when sync_mode is AVRO // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS - // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk, only supported in Snowflake + // if this starts with s3:// then it will be written to S3, only supported in Snowflake + // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. string staging_path = 15; @@ -360,4 +360,5 @@ message QRepFlowState { QRepPartition last_partition = 1; uint64 num_partitions_processed = 2; bool needs_resync = 3; + bool disable_wait_for_new_rows = 4; } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 6e1cbdb177..3e981a0b8f 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -412,8 +412,8 @@ export interface QRepConfig { * This is only used when sync_mode is AVRO * this is the location where the avro files will be written * if this starts with gs:// then it will be written to GCS - * if this starts with s3:// then it will be written to S3 - * if nothing is specified then it will be written to local disk, only supported in Snowflake + * if this starts with s3:// then it will be written to S3, only supported in Snowflake + * if nothing is specified then it will be written to local disk * if using GCS or S3 make sure your instance has the correct permissions. */ stagingPath: string; @@ -471,6 +471,7 @@ export interface QRepFlowState { lastPartition: QRepPartition | undefined; numPartitionsProcessed: number; needsResync: boolean; + disableWaitForNewRows: boolean; } function createBaseTableNameMapping(): TableNameMapping { @@ -6083,7 +6084,7 @@ export const ReplayTableSchemaDeltaInput = { }; function createBaseQRepFlowState(): QRepFlowState { - return { lastPartition: undefined, numPartitionsProcessed: 0, needsResync: false }; + return { lastPartition: undefined, numPartitionsProcessed: 0, needsResync: false, disableWaitForNewRows: false }; } export const QRepFlowState = { @@ -6097,6 +6098,9 @@ export const QRepFlowState = { if (message.needsResync === true) { writer.uint32(24).bool(message.needsResync); } + if (message.disableWaitForNewRows === true) { + writer.uint32(32).bool(message.disableWaitForNewRows); + } return writer; }, @@ -6128,6 +6132,13 @@ export const QRepFlowState = { message.needsResync = reader.bool(); continue; + case 4: + if (tag !== 32) { + break; + } + + message.disableWaitForNewRows = reader.bool(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -6142,6 +6153,7 @@ export const QRepFlowState = { lastPartition: isSet(object.lastPartition) ? QRepPartition.fromJSON(object.lastPartition) : undefined, numPartitionsProcessed: isSet(object.numPartitionsProcessed) ? Number(object.numPartitionsProcessed) : 0, needsResync: isSet(object.needsResync) ? Boolean(object.needsResync) : false, + disableWaitForNewRows: isSet(object.disableWaitForNewRows) ? Boolean(object.disableWaitForNewRows) : false, }; }, @@ -6156,6 +6168,9 @@ export const QRepFlowState = { if (message.needsResync === true) { obj.needsResync = message.needsResync; } + if (message.disableWaitForNewRows === true) { + obj.disableWaitForNewRows = message.disableWaitForNewRows; + } return obj; }, @@ -6169,6 +6184,7 @@ export const QRepFlowState = { : undefined; message.numPartitionsProcessed = object.numPartitionsProcessed ?? 0; message.needsResync = object.needsResync ?? false; + message.disableWaitForNewRows = object.disableWaitForNewRows ?? false; return message; }, };