diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e5df71cbf..3085d14df 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -120,4 +120,4 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres - PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 + PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10 diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 776b6cc82..c216ac523 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -1242,20 +1242,13 @@ func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { // grab an advisory lock based on the mirror jobs table hash mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl) - + _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) if err != nil { err = tx.Rollback(c.ctx) return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) } return func() error { - // release the lock - _, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl) - if err != nil { - return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err) - } - err = tx.Commit(c.ctx) if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 8cb8af79c..97c043ac0 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -1,19 +1,18 @@ package connbigquery import ( - "bytes" - "context" "encoding/json" "fmt" + "os" "strings" "time" "cloud.google.com/go/bigquery" "github.com/PeerDB-io/peer-flow/connectors/utils" + avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" ) @@ -44,13 +43,13 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) // You will need to define your Avro schema as a string - avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, fmt.Sprint(syncBatchID)) - numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream, nullable) + numRecords, err := s.writeToStage(fmt.Sprint(syncBatchID), dstTableName, avroSchema, stagingTable, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -106,7 +105,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( startTime := time.Now() // You will need to define your Avro schema as a string - avroSchema, nullable, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -114,10 +113,12 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( "flowName": flowJobName, }).Infof("Obtained Avro schema for destination table %s and partition ID %s", dstTableName, partition.PartitionId) - fmt.Printf("Avro schema: %s\n", avroSchema) + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("Avro schema: %v\n", avroSchema) // create a staging table name with partitionID replace hyphens with underscores stagingTable := fmt.Sprintf("%s_%s_staging", dstTableName, strings.ReplaceAll(partition.PartitionId, "-", "_")) - numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream, nullable) + numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema, stagingTable, stream) if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %v", err) } @@ -182,14 +183,15 @@ type AvroSchema struct { Fields []AvroField `json:"fields"` } -func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata) (string, map[string]bool, error) { +func DefineAvroSchema(dstTableName string, + dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) { avroFields := []AvroField{} nullableFields := map[string]bool{} for _, bqField := range dstTableMetadata.Schema { avroType, err := GetAvroType(bqField) if err != nil { - return "", nil, err + return nil, err } // If a field is nullable, its Avro type should be ["null", actualType] @@ -212,10 +214,13 @@ func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetad avroSchemaJSON, err := json.Marshal(avroSchema) if err != nil { - return "", nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) + return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) } - return string(avroSchemaJSON), nullableFields, nil + return &model.QRecordAvroSchemaDefinition{ + Schema: string(avroSchemaJSON), + NullableFields: nullableFields, + }, nil } func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { @@ -306,10 +311,9 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { func (s *QRepAvroSyncMethod) writeToStage( syncID string, objectFolder string, - avroSchema string, + avroSchema *model.QRecordAvroSchemaDefinition, stagingTable string, stream *model.QRecordStream, - nullable map[string]bool, ) (int, error) { shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute, func() string { @@ -320,95 +324,75 @@ func (s *QRepAvroSyncMethod) writeToStage( defer func() { shutdown <- true }() - ctx := context.Background() - bucket := s.connector.storageClient.Bucket(s.gcsBucket) - gcsObjectName := fmt.Sprintf("%s/%s.avro", objectFolder, syncID) - - obj := bucket.Object(gcsObjectName) - w := obj.NewWriter(ctx) - - // Create OCF Writer - var ocfFileContents bytes.Buffer - ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: &ocfFileContents, - Schema: avroSchema, - }) - if err != nil { - return 0, fmt.Errorf("failed to create OCF writer: %w", err) - } - schema, err := stream.Schema() - if err != nil { - log.WithFields(log.Fields{ - "partitonOrBatchID": syncID, - }).Errorf("failed to get schema from stream: %v", err) - return 0, fmt.Errorf("failed to get schema from stream: %w", err) - } + var avroFilePath string + numRecords, err := func() (int, error) { + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, + avro.CompressNone, qvalue.QDWHTypeBigQuery) + if s.gcsBucket != "" { + bucket := s.connector.storageClient.Bucket(s.gcsBucket) + avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID) + obj := bucket.Object(avroFilePath) + w := obj.NewWriter(s.connector.ctx) + + numRecords, err := ocfWriter.WriteOCF(w) + if err != nil { + return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err) + } + return numRecords, err + } else { + tmpDir, err := os.MkdirTemp("", "peerdb-avro") + if err != nil { + return 0, fmt.Errorf("failed to create temp dir: %w", err) + } - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Obtained staging bucket %s and schema of rows. Now writing records to OCF file.", - gcsObjectName), - ) - numRecords := 0 - // Write each QRecord to the OCF file - for qRecordOrErr := range stream.Records { - if numRecords > 0 && numRecords%10000 == 0 { - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Written %d records to OCF file for staging bucket %s.", - numRecords, gcsObjectName), - ) - } - if qRecordOrErr.Err != nil { + avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID) log.WithFields(log.Fields{ "batchOrPartitionID": syncID, - }).Errorf("[bq_avro] failed to get record from stream: %v", qRecordOrErr.Err) - return 0, fmt.Errorf("[bq_avro] failed to get record from stream: %w", qRecordOrErr.Err) - } - - qRecord := qRecordOrErr.Record - avroConverter := model.NewQRecordAvroConverter( - qRecord, - qvalue.QDWHTypeBigQuery, - &nullable, - schema.GetColumnNames(), - ) - avroMap, err := avroConverter.Convert() - if err != nil { - return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) - } - - err = ocfWriter.Append([]interface{}{avroMap}) - if err != nil { - return 0, fmt.Errorf("failed to write record to OCF file: %w", err) + }).Infof("writing records to local file %s", avroFilePath) + numRecords, err := ocfWriter.WriteRecordsToAvroFile(avroFilePath) + if err != nil { + return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) + } + return numRecords, err } - numRecords++ - } - activity.RecordHeartbeat(s.connector.ctx, fmt.Sprintf( - "Writing OCF contents to BigQuery for partition/batch ID %s", - syncID), - ) - // Write OCF contents to GCS - if _, err = w.Write(ocfFileContents.Bytes()); err != nil { - return 0, fmt.Errorf("failed to write OCF file to GCS: %w", err) + }() + if err != nil { + return 0, err } - - if err := w.Close(); err != nil { - return 0, fmt.Errorf("failed to close GCS object writer: %w", err) + if numRecords == 0 { + return 0, nil } + log.WithFields(log.Fields{ + "batchOrPartitionID": syncID, + }).Infof("wrote %d records to file %s", numRecords, avroFilePath) - // write this file to bigquery - gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, gcsObjectName)) - gcsRef.SourceFormat = bigquery.Avro bqClient := s.connector.client datasetID := s.connector.datasetID - loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(gcsRef) + var avroRef bigquery.LoadSource + if s.gcsBucket != "" { + gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath)) + gcsRef.SourceFormat = bigquery.Avro + gcsRef.Compression = bigquery.Deflate + avroRef = gcsRef + } else { + fh, err := os.Open(avroFilePath) + if err != nil { + return 0, fmt.Errorf("failed to read local Avro file: %w", err) + } + localRef := bigquery.NewReaderSource(fh) + localRef.SourceFormat = bigquery.Avro + avroRef = localRef + } + + loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef) loader.UseAvroLogicalTypes = true - job, err := loader.Run(ctx) + job, err := loader.Run(s.connector.ctx) if err != nil { return 0, fmt.Errorf("failed to run BigQuery load job: %w", err) } - status, err := job.Wait(ctx) + status, err := job.Wait(s.connector.ctx) if err != nil { return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err) } @@ -417,6 +401,6 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } log.Printf("Pushed into %s/%s", - gcsObjectName, syncID) + avroFilePath, syncID) return numRecords, nil } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index b34f9a2cf..1f1cf881d 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -7,6 +7,7 @@ import ( avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" log "github.com/sirupsen/logrus" ) @@ -62,7 +63,7 @@ func (c *S3Connector) writeToAvroFile( } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID) - writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema) + writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds) if err != nil { return 0, fmt.Errorf("failed to write records to S3: %w", err) diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 77310c45d..76b70f478 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -1,6 +1,7 @@ package connsnowflake import ( + "context" "fmt" "math/big" "os" @@ -142,7 +143,64 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) + _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) + require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") + + // Check file is not empty + info, err := tmpfile.Stat() + require.NoError(t, err) + require.NotZero(t, info.Size(), "expected file to not be empty") +} + +func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) { + // Create temporary file + tmpfile, err := os.CreateTemp("", "example_*.avro.zst") + require.NoError(t, err) + + defer os.Remove(tmpfile.Name()) // clean up + defer tmpfile.Close() // close file after test ends + + // Define sample data + records, schema := generateRecords(t, true, 10, false) + + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + require.NoError(t, err) + + fmt.Printf("[test] avroSchema: %v\n", avroSchema) + + // Call function + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressZstd, qvalue.QDWHTypeSnowflake) + _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) + require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") + + // Check file is not empty + info, err := tmpfile.Stat() + require.NoError(t, err) + require.NotZero(t, info.Size(), "expected file to not be empty") +} + +func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) { + // Create temporary file + tmpfile, err := os.CreateTemp("", "example_*.avro.zz") + require.NoError(t, err) + + defer os.Remove(tmpfile.Name()) // clean up + defer tmpfile.Close() // close file after test ends + + // Define sample data + records, schema := generateRecords(t, true, 10, false) + + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + require.NoError(t, err) + + fmt.Printf("[test] avroSchema: %v\n", avroSchema) + + // Call function + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressDeflate, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") @@ -168,7 +226,8 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") @@ -195,7 +254,8 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) { fmt.Printf("[test] avroSchema: %v\n", avroSchema) // Call function - writer := avro.NewPeerDBOCFWriter(nil, records, avroSchema) + writer := avro.NewPeerDBOCFWriter(context.Background(), + records, avroSchema, avro.CompressNone, qvalue.QDWHTypeSnowflake) _, err = writer.WriteRecordsToAvroFile(tmpfile.Name()) require.NoError(t, err, "expected WriteRecordsToAvroFile to complete without errors") diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index b41e97a19..7d540c9f2 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -274,7 +274,8 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( ) (int, string, error) { var numRecords int if s.config.StagingPath == "" { - ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema) + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, + qvalue.QDWHTypeSnowflake) tmpDir, err := os.MkdirTemp("", "peerdb-avro") if err != nil { return 0, "", fmt.Errorf("failed to create temp dir: %w", err) @@ -292,13 +293,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( return numRecords, localFilePath, nil } else if strings.HasPrefix(s.config.StagingPath, "s3://") { - ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, + qvalue.QDWHTypeSnowflake) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) if err != nil { return 0, "", fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, s.config.FlowJobName, partitionID) + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID) log.WithFields(log.Fields{ "flowName": flowJobName, "partitionID": partitionID, diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 0b4cf09d7..36c8858aa 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -13,56 +13,67 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/klauspost/compress/flate" + "github.com/klauspost/compress/snappy" "github.com/klauspost/compress/zstd" "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" uber_atomic "go.uber.org/atomic" ) +type AvroCompressionCodec int64 + +const ( + CompressNone AvroCompressionCodec = iota + CompressZstd + CompressDeflate + CompressSnappy +) + type PeerDBOCFWriter struct { - ctx context.Context - stream *model.QRecordStream - avroSchema *model.QRecordAvroSchemaDefinition - compress bool - writer io.WriteCloser + ctx context.Context + stream *model.QRecordStream + avroSchema *model.QRecordAvroSchemaDefinition + avroCompressionCodec AvroCompressionCodec + writer io.WriteCloser + targetDWH qvalue.QDWHType } func NewPeerDBOCFWriter( ctx context.Context, stream *model.QRecordStream, avroSchema *model.QRecordAvroSchemaDefinition, + avroCompressionCodec AvroCompressionCodec, + targetDWH qvalue.QDWHType, ) *PeerDBOCFWriter { return &PeerDBOCFWriter{ - ctx: ctx, - stream: stream, - avroSchema: avroSchema, - compress: false, - } -} - -func NewPeerDBOCFWriterWithCompression( - ctx context.Context, - stream *model.QRecordStream, - avroSchema *model.QRecordAvroSchemaDefinition, -) *PeerDBOCFWriter { - return &PeerDBOCFWriter{ - ctx: ctx, - stream: stream, - avroSchema: avroSchema, - compress: true, + ctx: ctx, + stream: stream, + avroSchema: avroSchema, + avroCompressionCodec: avroCompressionCodec, + targetDWH: targetDWH, } } func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error { var err error - if p.compress { + switch p.avroCompressionCodec { + case CompressNone: + p.writer = &nopWriteCloser{w} + case CompressZstd: p.writer, err = zstd.NewWriter(w) if err != nil { return fmt.Errorf("error while initializing zstd encoding writer: %w", err) } - } else { - p.writer = &nopWriteCloser{w} + case CompressDeflate: + p.writer, err = flate.NewWriter(w, -1) + if err != nil { + return fmt.Errorf("error while initializing deflate encoding writer: %w", err) + } + case CompressSnappy: + p.writer = snappy.NewBufferedWriter(w) } + return nil } @@ -115,7 +126,7 @@ func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( qRecord := qRecordOrErr.Record avroConverter := model.NewQRecordAvroConverter( qRecord, - qvalue.QDWHTypeSnowflake, + p.targetDWH, &p.avroSchema.NullableFields, colNames, ) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index be7f45ef4..90fbb552d 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -28,43 +29,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - s := &PeerFlowE2ETestSuiteBQ{} - s.SetT(t) - s.SetupSuite() - - tests := []struct { - name string - test func(t *testing.T) - }{ - {"Test_Invalid_Connection_Config", s.Test_Invalid_Connection_Config}, - {"Test_Complete_Flow_No_Data", s.Test_Complete_Flow_No_Data}, - {"Test_Char_ColType_Error", s.Test_Char_ColType_Error}, - {"Test_Complete_Simple_Flow_BQ", s.Test_Complete_Simple_Flow_BQ}, - {"Test_Toast_BQ", s.Test_Toast_BQ}, - {"Test_Toast_Nochanges_BQ", s.Test_Toast_Nochanges_BQ}, - {"Test_Toast_Advance_1_BQ", s.Test_Toast_Advance_1_BQ}, - {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, - {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, - {"Test_Types_BQ", s.Test_Types_BQ}, - {"Test_Types_Avro_BQ", s.Test_Types_Avro_BQ}, - {"Test_Simple_Flow_BQ_Avro_CDC", s.Test_Simple_Flow_BQ_Avro_CDC}, - {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, - {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, - {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, - {"Test_Composite_PKey_Toast_1_BQ", s.Test_Composite_PKey_Toast_1_BQ}, - {"Test_Composite_PKey_Toast_2_BQ", s.Test_Composite_PKey_Toast_2_BQ}, - } - - // Assert that there are no duplicate test names - testNames := make(map[string]bool) - for _, tt := range tests { - if testNames[tt.name] { - t.Fatalf("duplicate test name: %s", tt.name) - } - testNames[tt.name] = true - - t.Run(tt.name, tt.test) - } + suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -113,7 +78,9 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { s.setupTemporalLogger() - s.bqSuffix = strings.ToLower(util.RandomString(8)) + suffix := util.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + s.bqSuffix = fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(s.bqSuffix) if err != nil { s.Fail("failed to setup postgres", err) @@ -139,8 +106,7 @@ func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -163,8 +129,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -178,17 +143,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { value VARCHAR(255) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -208,8 +175,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -223,17 +189,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { value CHAR(255) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -256,8 +224,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -271,17 +238,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -299,7 +268,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -315,7 +284,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - require.NoError(t, err) + s.NoError(err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -324,8 +293,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -340,20 +308,22 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -375,7 +345,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -393,8 +363,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -409,20 +378,22 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -437,7 +408,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -455,8 +426,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -471,17 +441,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -512,7 +484,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -530,8 +502,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -545,20 +516,22 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -580,7 +553,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -598,8 +571,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -614,20 +586,22 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -648,7 +622,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -666,8 +640,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -682,21 +655,23 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -719,88 +694,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - require.NoError(t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", - "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", - "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44"}) - if err != nil { - fmt.Println("error %w", err) - } - // Make sure that there are no nulls - s.True(noNulls) - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_types_avro_bq") - dstTableName := "test_types_avro_bq" - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, - c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, - c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, - c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, - c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - `, srcTableName)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_types_avro_bq"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - - TotalSyncFlows: 1, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - /* test inserting various types*/ - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s SELECT 2,2,b'1',b'101', - true,random_bytea(32),'s','test','1.1.10.2'::cidr, - CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, - '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, - '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, - 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, - txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), - ARRAY[9301,239827], - ARRAY[0.0003, 1039.0034], - ARRAY['hello','bye']; - `, srcTableName)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -826,74 +720,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_simple_flow_bq_avro_cdc") - dstTableName := "test_simple_flow_bq_avro_cdc" - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(t, err) - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow_bq_avro_cdc"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.bqHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, - } - - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - for i := 0; i < 10; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(t, err) - } - fmt.Println("Inserted 10 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - count, err := s.bqHelper.countRows(dstTableName) - require.NoError(t, err) - s.Equal(10, count) - - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -906,20 +733,22 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -932,7 +761,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed an insert on two tables") }() @@ -943,9 +772,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - require.NoError(t, err) + s.NoError(err) count2, err := s.bqHelper.countRows(dstTable2Name) - require.NoError(t, err) + s.NoError(err) s.Equal(1, count1) s.Equal(1, count2) @@ -954,8 +783,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -968,17 +796,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { c1 BIGINT ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -992,7 +822,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -1002,11 +832,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1016,11 +846,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1030,11 +860,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1055,8 +885,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1072,17 +901,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1099,7 +930,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") @@ -1109,9 +940,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1129,8 +960,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1147,17 +977,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1169,7 +1001,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(t, err) + s.NoError(err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1177,18 +1009,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) err = rowsTx.Commit(context.Background()) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1207,8 +1039,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1225,17 +1056,19 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1253,16 +1086,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 8bd4b6135..8a97c3b85 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -39,9 +39,9 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt // read rows from destination table qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqRows, err := s.bqHelper.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName), - ) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) + fmt.Printf("running query on bigquery: %s\n", bqSelQuery) + bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) s.NoError(err) s.True(pgRows.Equals(bqRows), "rows from source and destination tables are not equal") @@ -81,43 +81,3 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env.AssertExpectations(s.T()) } - -// NOTE: Disabled due to large JSON tests being added: https://github.com/PeerDB-io/peerdb/issues/309 - -// Test_Complete_QRep_Flow tests a complete flow with data in the source table. -// The test inserts 10 rows into the source table and verifies that the data is -// // correctly synced to the destination table this runs a QRep Flow. -// func (s *E2EPeerFlowTestSuite) Test_Complete_QRep_Flow_Multi_Insert() { -// env := s.NewTestWorkflowEnvironment() -// registerWorkflowsAndActivities(env) - -// numRows := 10 - -// tblName := "test_qrep_flow_multi_insert" -// s.setupSourceTable(tblName, numRows) -// s.setupBQDestinationTable(tblName) - -// query := fmt.Sprintf("SELECT * FROM e2e_test.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", tblName) - -// qrepConfig := s.createQRepWorkflowConfig("test_qrep_flow_mi", -// "e2e_test."+tblName, -// tblName, -// query, -// protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT, -// s.bqHelper.Peer) -// runQrepFlowWorkflow(env, qrepConfig) - -// // Verify workflow completes without error -// s.True(env.IsWorkflowCompleted()) - -// // assert that error contains "invalid connection configs" -// err := env.GetWorkflowError() -// s.NoError(err) - -// count, err := s.bqHelper.CountRows(tblName) -// s.NoError(err) - -// s.Equal(numRows, count) - -// env.AssertExpectations(s.T()) -// } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 3a0f71b15..72a756e53 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -95,7 +95,6 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { } _, err = pool.Exec(context.Background(), ` - SELECT pg_advisory_lock(hashtext('peerdb_pg_setup_lock')); CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 95bb972a7..37848f038 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -32,42 +32,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - s := &PeerFlowE2ETestSuiteSF{} - s.SetT(t) - s.SetupSuite() - - tests := []struct { - name string - test func(t *testing.T) - }{ - {"Test_Complete_Simple_Flow_SF", s.Test_Complete_Simple_Flow_SF}, - {"Test_Complete_Simple_Flow_SF_Avro_CDC", s.Test_Complete_Simple_Flow_SF_Avro_CDC}, - {"Test_Invalid_Geo_SF_Avro_CDC", s.Test_Invalid_Geo_SF_Avro_CDC}, - {"Test_Toast_SF", s.Test_Toast_SF}, - {"Test_Toast_Nochanges_SF", s.Test_Toast_Nochanges_SF}, - {"Test_Toast_Advance_1_SF", s.Test_Toast_Advance_1_SF}, - {"Test_Toast_Advance_2_SF", s.Test_Toast_Advance_2_SF}, - {"Test_Toast_Advance_3_SF", s.Test_Toast_Advance_3_SF}, - {"Test_Types_SF", s.Test_Types_SF}, - {"Test_Types_SF_Avro_CDC", s.Test_Types_SF_Avro_CDC}, - {"Test_Multi_Table_SF", s.Test_Multi_Table_SF}, - {"Test_Simple_Schema_Changes_SF", s.Test_Simple_Schema_Changes_SF}, - {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, - {"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF}, - {"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF}, - {"Test_Column_Exclusion", s.Test_Column_Exclusion}, - } - - // assert that there are no duplicate test names - testNames := make(map[string]bool) - for _, tt := range tests { - if testNames[tt.name] { - t.Fatalf("duplicate test name: %s", tt.name) - } - testNames[tt.name] = true - - t.Run(tt.name, tt.test) - } + suite.Run(t, new(PeerFlowE2ETestSuiteSF)) } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -149,8 +114,7 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { require.NoError(s.T(), err) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -164,17 +128,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -182,17 +147,17 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table + // and then insert 15 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table - for i := 0; i < 10; i++ { + // insert 15 rows into the source table + for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -208,8 +173,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { s.Contains(err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - require.NoError(t, err) - s.Equal(10, count) + s.NoError(err) + s.Equal(20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -217,87 +182,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - require.NoError(t, err) - s.Equal(10, numNewRows) + s.NoError(err) + s.Equal(20, numNewRows) // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side + // on the Snowflake side env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - tblConst := "test_simple_flow_sf_avro_cdc" - srcTableName := s.attachSchemaSuffix(tblConst) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblConst) - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow_avro"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.sfHelper.Peer, - CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table - for i := 0; i < 15; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(t, err) - } - fmt.Println("Inserted 15 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - count, err := s.sfHelper.CountRows(tblConst) - require.NoError(t, err) - s.Equal(15, count) - - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -311,7 +205,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -322,7 +216,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -343,7 +237,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -353,7 +247,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -371,11 +265,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - require.NoError(t, err) + s.NoError(err) s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - require.NoError(t, err) + s.NoError(err) s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table @@ -384,8 +278,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -400,20 +293,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -435,7 +329,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -453,8 +347,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -471,20 +364,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { ); `, srcTableName, srcTableName)) log.Infof("Creating table '%s', err: %v", srcTableName, err) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -499,7 +393,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -517,8 +411,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -534,17 +427,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -575,7 +469,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -593,8 +487,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -609,20 +502,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -644,7 +538,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -662,8 +556,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -679,20 +572,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -713,7 +607,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -731,8 +625,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -749,86 +642,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); `, srcTableName, srcTableName)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_types_sf"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.sfHelper.Peer, - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, - } - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - /* test inserting various types*/ - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s SELECT 2,2,b'1',b'101', - true,random_bytea(32),'s','test','1.1.10.2'::cidr, - CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, - '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, - '{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, - 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, - 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, - txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), - 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', - 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', - 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; - `, srcTableName)) - require.NoError(t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", - "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", - "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) - if err != nil { - fmt.Println("error %w", err) - } - // Make sure that there are no nulls - s.Equal(noNulls, true) - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { - t.Parallel() - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env) - - srcTableName := s.attachSchemaSuffix("test_types_sf_avro_cdc") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf_avro_cdc") - - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, - c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, - c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, - c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, - c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), - c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -839,10 +653,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -865,7 +679,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -878,7 +692,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { s.Error(err) s.Contains(err.Error(), "continue as new") - noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", + noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) @@ -891,8 +705,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -905,20 +718,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -931,7 +745,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -941,9 +755,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - require.NoError(t, err) + s.NoError(err) count2, err := s.sfHelper.CountRows("test2_sf") - require.NoError(t, err) + s.NoError(err) s.Equal(1, count1) s.Equal(1, count2) @@ -951,8 +765,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -965,17 +778,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { c1 BIGINT ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -989,7 +803,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -1006,18 +820,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1035,18 +849,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1065,18 +879,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1095,7 +909,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() @@ -1113,8 +927,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1130,17 +943,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1157,7 +971,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") @@ -1167,9 +981,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1188,8 +1002,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1206,17 +1019,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1228,7 +1042,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(t, err) + s.NoError(err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1236,18 +1050,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) err = rowsTx.Commit(context.Background()) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1266,8 +1080,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1284,17 +1097,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, + CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1312,16 +1126,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1340,8 +1154,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1358,7 +1171,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_exclude_flow"), @@ -1395,20 +1208,20 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() s.Error(err) s.Contains(err.Error(), "continue as new") @@ -1416,11 +1229,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) - require.NoError(t, err) + s.NoError(err) for _, field := range sfRows.Schema.Fields { - require.NotEqual(t, field.Name, "c2") + s.NotEqual(field.Name, "c2") } - require.Equal(t, 4, len(sfRows.Schema.Fields)) - require.Equal(t, 10, len(sfRows.Records)) + s.Equal(4, len(sfRows.Schema.Fields)) + s.Equal(10, len(sfRows.Records)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index f26afc2ee..b57ad7221 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -71,7 +71,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, log.Errorln(err) } - if state.SetupComplete { + if state.SnapshotComplete { break } } else { @@ -293,7 +293,6 @@ func CreateQRepWorkflowConfig( if err != nil { return nil, err } - qrepConfig.InitialCopyOnly = true return qrepConfig, nil @@ -301,6 +300,7 @@ func CreateQRepWorkflowConfig( func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { state := peerflow.NewQRepFlowState() + time.Sleep(5 * time.Second) env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index df610088a..bf29f3345 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2659,8 +2659,8 @@ type QRepConfig struct { // This is only used when sync_mode is AVRO // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS - // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk, only supported in Snowflake + // if this starts with s3:// then it will be written to S3, only supported in Snowflake + // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. StagingPath string `protobuf:"bytes,15,opt,name=staging_path,json=stagingPath,proto3" json:"staging_path,omitempty"` // This setting overrides batch_size_int and batch_duration_seconds @@ -3225,6 +3225,7 @@ type QRepFlowState struct { LastPartition *QRepPartition `protobuf:"bytes,1,opt,name=last_partition,json=lastPartition,proto3" json:"last_partition,omitempty"` NumPartitionsProcessed uint64 `protobuf:"varint,2,opt,name=num_partitions_processed,json=numPartitionsProcessed,proto3" json:"num_partitions_processed,omitempty"` NeedsResync bool `protobuf:"varint,3,opt,name=needs_resync,json=needsResync,proto3" json:"needs_resync,omitempty"` + DisableWaitForNewRows bool `protobuf:"varint,4,opt,name=disable_wait_for_new_rows,json=disableWaitForNewRows,proto3" json:"disable_wait_for_new_rows,omitempty"` } func (x *QRepFlowState) Reset() { @@ -3280,6 +3281,13 @@ func (x *QRepFlowState) GetNeedsResync() bool { return false } +func (x *QRepFlowState) GetDisableWaitForNewRows() bool { + if x != nil { + return x.DisableWaitForNewRows + } + return false +} + var File_flow_proto protoreflect.FileDescriptor var file_flow_proto_rawDesc = []byte{ @@ -3941,7 +3949,7 @@ var file_flow_proto_rawDesc = []byte{ 0x64, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x52, 0x11, 0x74, 0x61, 0x62, - 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xaf, + 0x6c, 0x65, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x73, 0x22, 0xe9, 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x41, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, @@ -3953,26 +3961,30 @@ var file_flow_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x5f, 0x72, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6e, 0x65, 0x65, 0x64, 0x73, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, - 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, - 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, - 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, - 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, - 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, - 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, - 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, - 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, - 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, - 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, - 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, - 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, - 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, - 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, - 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, - 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, - 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x38, 0x0a, 0x19, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x61, 0x69, 0x74, + 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x15, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x57, 0x61, 0x69, 0x74, + 0x46, 0x6f, 0x72, 0x4e, 0x65, 0x77, 0x52, 0x6f, 0x77, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, + 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, + 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, + 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, + 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, + 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x66, 0x0a, 0x0d, + 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, + 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, + 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, + 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, + 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, + 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4f, 0x56, 0x45, 0x52, 0x57, 0x52, 0x49, + 0x54, 0x45, 0x10, 0x02, 0x42, 0x76, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x42, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x50, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xca, 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0xe2, 0x02, 0x16, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, + 0x6c, 0x6f, 0x77, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, + 0x02, 0x0a, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x46, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/flow/go.sum b/flow/go.sum index 3122b2471..1e3fe7ff8 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -289,6 +289,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= +github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 25f4f7b20..27ebc4014 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -21,31 +21,32 @@ type QRecordBatch struct { // Equals checks if two QRecordBatches are identical. func (q *QRecordBatch) Equals(other *QRecordBatch) bool { if other == nil { + fmt.Printf("other is nil") return q == nil } // First check simple attributes if q.NumRecords != other.NumRecords { // print num records - log.Infof("q.NumRecords: %d\n", q.NumRecords) - log.Infof("other.NumRecords: %d\n", other.NumRecords) + fmt.Printf("q.NumRecords: %d\n", q.NumRecords) + fmt.Printf("other.NumRecords: %d\n", other.NumRecords) return false } // Compare column names if !q.Schema.EqualNames(other.Schema) { - log.Infof("Column names are not equal") - log.Infof("Schema 1: %v", q.Schema.GetColumnNames()) - log.Infof("Schema 2: %v", other.Schema.GetColumnNames()) + fmt.Printf("Column names are not equal\n") + fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames()) + fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames()) return false } // Compare records for i, record := range q.Records { if !record.equals(other.Records[i]) { - log.Infof("Record %d is not equal", i) - log.Infof("Record 1: %v", record) - log.Infof("Record 2: %v", other.Records[i]) + fmt.Printf("Record %d is not equal\n", i) + fmt.Printf("Record 1: %v\n", record) + fmt.Printf("Record 2: %v\n", other.Records[i]) return false } } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index f20d17951..3b8e77a68 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -34,7 +34,7 @@ type QRepPartitionFlowExecution struct { runUUID string } -// returns a new empty PeerFlowState +// returns a new empty QRepFlowState func NewQRepFlowState() *protos.QRepFlowState { return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ @@ -46,6 +46,19 @@ func NewQRepFlowState() *protos.QRepFlowState { } } +// returns a new empty QRepFlowState +func NewQRepFlowStateForTesting() *protos.QRepFlowState { + return &protos.QRepFlowState{ + LastPartition: &protos.QRepPartition{ + PartitionId: "not-applicable-partition", + Range: nil, + }, + NumPartitionsProcessed: 0, + NeedsResync: true, + DisableWaitForNewRows: true, + } +} + // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { return &QRepFlowExecution{ @@ -440,10 +453,12 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - // sleep for a while and continue the workflow - err = q.waitForNewRows(ctx, state.LastPartition) - if err != nil { - return err + if !state.DisableWaitForNewRows { + // sleep for a while and continue the workflow + err = q.waitForNewRows(ctx, state.LastPartition) + if err != nil { + return err + } } workflow.GetLogger(ctx).Info("Continuing as new workflow", diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 67ba78e80..dc308131e 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -457,8 +457,8 @@ pub struct QRepConfig { /// This is only used when sync_mode is AVRO /// this is the location where the avro files will be written /// if this starts with gs:// then it will be written to GCS - /// if this starts with s3:// then it will be written to S3 - /// if nothing is specified then it will be written to local disk, only supported in Snowflake + /// if this starts with s3:// then it will be written to S3, only supported in Snowflake + /// if nothing is specified then it will be written to local disk /// if using GCS or S3 make sure your instance has the correct permissions. #[prost(string, tag="15")] pub staging_path: ::prost::alloc::string::String, @@ -540,6 +540,8 @@ pub struct QRepFlowState { pub num_partitions_processed: u64, #[prost(bool, tag="3")] pub needs_resync: bool, + #[prost(bool, tag="4")] + pub disable_wait_for_new_rows: bool, } /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index ebcd1ffe5..0436bf334 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -3004,6 +3004,9 @@ impl serde::Serialize for QRepFlowState { if self.needs_resync { len += 1; } + if self.disable_wait_for_new_rows { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepFlowState", len)?; if let Some(v) = self.last_partition.as_ref() { struct_ser.serialize_field("lastPartition", v)?; @@ -3014,6 +3017,9 @@ impl serde::Serialize for QRepFlowState { if self.needs_resync { struct_ser.serialize_field("needsResync", &self.needs_resync)?; } + if self.disable_wait_for_new_rows { + struct_ser.serialize_field("disableWaitForNewRows", &self.disable_wait_for_new_rows)?; + } struct_ser.end() } } @@ -3030,6 +3036,8 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { "numPartitionsProcessed", "needs_resync", "needsResync", + "disable_wait_for_new_rows", + "disableWaitForNewRows", ]; #[allow(clippy::enum_variant_names)] @@ -3037,6 +3045,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { LastPartition, NumPartitionsProcessed, NeedsResync, + DisableWaitForNewRows, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -3062,6 +3071,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { "lastPartition" | "last_partition" => Ok(GeneratedField::LastPartition), "numPartitionsProcessed" | "num_partitions_processed" => Ok(GeneratedField::NumPartitionsProcessed), "needsResync" | "needs_resync" => Ok(GeneratedField::NeedsResync), + "disableWaitForNewRows" | "disable_wait_for_new_rows" => Ok(GeneratedField::DisableWaitForNewRows), _ => Ok(GeneratedField::__SkipField__), } } @@ -3084,6 +3094,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { let mut last_partition__ = None; let mut num_partitions_processed__ = None; let mut needs_resync__ = None; + let mut disable_wait_for_new_rows__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::LastPartition => { @@ -3106,6 +3117,12 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { } needs_resync__ = Some(map.next_value()?); } + GeneratedField::DisableWaitForNewRows => { + if disable_wait_for_new_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("disableWaitForNewRows")); + } + disable_wait_for_new_rows__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -3115,6 +3132,7 @@ impl<'de> serde::Deserialize<'de> for QRepFlowState { last_partition: last_partition__, num_partitions_processed: num_partitions_processed__.unwrap_or_default(), needs_resync: needs_resync__.unwrap_or_default(), + disable_wait_for_new_rows: disable_wait_for_new_rows__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 6289f993f..281f60999 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -307,8 +307,8 @@ message QRepConfig { // This is only used when sync_mode is AVRO // this is the location where the avro files will be written // if this starts with gs:// then it will be written to GCS - // if this starts with s3:// then it will be written to S3 - // if nothing is specified then it will be written to local disk, only supported in Snowflake + // if this starts with s3:// then it will be written to S3, only supported in Snowflake + // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. string staging_path = 15; @@ -364,4 +364,5 @@ message QRepFlowState { QRepPartition last_partition = 1; uint64 num_partitions_processed = 2; bool needs_resync = 3; + bool disable_wait_for_new_rows = 4; } diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 95118814f..3e8f36e97 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -416,8 +416,8 @@ export interface QRepConfig { * This is only used when sync_mode is AVRO * this is the location where the avro files will be written * if this starts with gs:// then it will be written to GCS - * if this starts with s3:// then it will be written to S3 - * if nothing is specified then it will be written to local disk, only supported in Snowflake + * if this starts with s3:// then it will be written to S3, only supported in Snowflake + * if nothing is specified then it will be written to local disk * if using GCS or S3 make sure your instance has the correct permissions. */ stagingPath: string; @@ -475,6 +475,7 @@ export interface QRepFlowState { lastPartition: QRepPartition | undefined; numPartitionsProcessed: number; needsResync: boolean; + disableWaitForNewRows: boolean; } function createBaseTableNameMapping(): TableNameMapping { @@ -6155,7 +6156,7 @@ export const ReplayTableSchemaDeltaInput = { }; function createBaseQRepFlowState(): QRepFlowState { - return { lastPartition: undefined, numPartitionsProcessed: 0, needsResync: false }; + return { lastPartition: undefined, numPartitionsProcessed: 0, needsResync: false, disableWaitForNewRows: false }; } export const QRepFlowState = { @@ -6169,6 +6170,9 @@ export const QRepFlowState = { if (message.needsResync === true) { writer.uint32(24).bool(message.needsResync); } + if (message.disableWaitForNewRows === true) { + writer.uint32(32).bool(message.disableWaitForNewRows); + } return writer; }, @@ -6200,6 +6204,13 @@ export const QRepFlowState = { message.needsResync = reader.bool(); continue; + case 4: + if (tag !== 32) { + break; + } + + message.disableWaitForNewRows = reader.bool(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -6214,6 +6225,7 @@ export const QRepFlowState = { lastPartition: isSet(object.lastPartition) ? QRepPartition.fromJSON(object.lastPartition) : undefined, numPartitionsProcessed: isSet(object.numPartitionsProcessed) ? Number(object.numPartitionsProcessed) : 0, needsResync: isSet(object.needsResync) ? Boolean(object.needsResync) : false, + disableWaitForNewRows: isSet(object.disableWaitForNewRows) ? Boolean(object.disableWaitForNewRows) : false, }; }, @@ -6228,6 +6240,9 @@ export const QRepFlowState = { if (message.needsResync === true) { obj.needsResync = message.needsResync; } + if (message.disableWaitForNewRows === true) { + obj.disableWaitForNewRows = message.disableWaitForNewRows; + } return obj; }, @@ -6241,6 +6256,7 @@ export const QRepFlowState = { : undefined; message.numPartitionsProcessed = object.numPartitionsProcessed ?? 0; message.needsResync = object.needsResync ?? false; + message.disableWaitForNewRows = object.disableWaitForNewRows ?? false; return message; }, };