From a9b982f35af857e31d7839670d9b91b5cf85251d Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 21 Nov 2023 23:00:22 +0530 Subject: [PATCH] unparallel your tests --- flow/connectors/bigquery/qrep_avro_sync.go | 4 +- flow/e2e/bigquery/peer_flow_bq_test.go | 234 ++++++++------------- flow/e2e/congen.go | 15 -- flow/e2e/snowflake/peer_flow_sf_test.go | 227 ++++++++------------ flow/model/qrecord_batch.go | 17 +- 5 files changed, 191 insertions(+), 306 deletions(-) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 97c043ac04..5b9b99b13d 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -331,7 +331,7 @@ func (s *QRepAvroSyncMethod) writeToStage( avro.CompressNone, qvalue.QDWHTypeBigQuery) if s.gcsBucket != "" { bucket := s.connector.storageClient.Bucket(s.gcsBucket) - avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID) + avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID) obj := bucket.Object(avroFilePath) w := obj.NewWriter(s.connector.ctx) @@ -346,7 +346,7 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to create temp dir: %w", err) } - avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID) + avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", tmpDir, syncID) log.WithFields(log.Fields{ "batchOrPartitionID": syncID, }).Infof("writing records to local file %s", avroFilePath) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 8d519b4f99..dd4954284b 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -28,45 +29,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - s := &PeerFlowE2ETestSuiteBQ{} - s.SetT(t) - s.SetupSuite() - - tests := []struct { - name string - test func(t *testing.T) - }{ - {"Test_Invalid_Connection_Config", s.Test_Invalid_Connection_Config}, - {"Test_Complete_Flow_No_Data", s.Test_Complete_Flow_No_Data}, - {"Test_Char_ColType_Error", s.Test_Char_ColType_Error}, - {"Test_Complete_Simple_Flow_BQ", s.Test_Complete_Simple_Flow_BQ}, - {"Test_Toast_BQ", s.Test_Toast_BQ}, - {"Test_Toast_Nochanges_BQ", s.Test_Toast_Nochanges_BQ}, - {"Test_Toast_Advance_1_BQ", s.Test_Toast_Advance_1_BQ}, - {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, - {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, - {"Test_Types_BQ", s.Test_Types_BQ}, - {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, - {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, - {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, - {"Test_Composite_PKey_Toast_1_BQ", s.Test_Composite_PKey_Toast_1_BQ}, - {"Test_Composite_PKey_Toast_2_BQ", s.Test_Composite_PKey_Toast_2_BQ}, - } - - // Assert that there are no duplicate test names - testNames := make(map[string]bool) - for _, tt := range tests { - if testNames[tt.name] { - t.Fatalf("duplicate test name: %s", tt.name) - } - testNames[tt.name] = true - - t.Run(tt.name, tt.test) - } - - t.Cleanup(func() { - s.TearDownSuite() - }) + suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -115,7 +78,9 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { s.setupTemporalLogger() - s.bqSuffix = strings.ToLower(util.RandomString(8)) + suffix := util.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + s.bqSuffix = fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(s.bqSuffix) if err != nil { s.Fail("failed to setup postgres", err) @@ -141,8 +106,7 @@ func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -165,8 +129,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -180,7 +143,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { value VARCHAR(255) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -188,11 +151,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -212,8 +175,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -227,7 +189,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { value CHAR(255) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -235,11 +197,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -262,8 +224,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -277,7 +238,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -285,14 +246,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, + TotalSyncFlows: 3, MaxBatchSize: 100, } @@ -307,7 +268,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -323,7 +284,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - require.NoError(t, err) + s.NoError(err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -332,8 +293,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -348,7 +308,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -356,11 +316,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -385,7 +345,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -403,8 +363,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -419,7 +378,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -427,11 +386,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -449,7 +408,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -467,8 +426,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -483,7 +441,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -491,11 +449,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -526,7 +484,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -544,8 +502,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -559,7 +516,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -567,11 +524,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -596,7 +553,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -614,8 +571,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -630,7 +586,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -638,11 +594,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -666,7 +622,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -684,8 +640,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -700,7 +655,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -708,11 +663,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ @@ -739,7 +694,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -765,8 +720,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -779,7 +733,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -787,11 +741,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -807,7 +761,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed an insert on two tables") }() @@ -818,9 +772,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - require.NoError(t, err) + s.NoError(err) count2, err := s.bqHelper.countRows(dstTable2Name) - require.NoError(t, err) + s.NoError(err) s.Equal(1, count1) s.Equal(1, count2) @@ -829,8 +783,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -843,7 +796,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { c1 BIGINT ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -851,11 +804,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -869,7 +822,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -879,11 +832,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -893,11 +846,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -907,11 +860,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -932,8 +885,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -949,7 +901,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -957,11 +909,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -978,7 +930,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") @@ -988,9 +940,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1008,8 +960,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1026,7 +977,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1034,11 +985,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1050,7 +1001,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(t, err) + s.NoError(err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1058,18 +1009,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) err = rowsTx.Commit(context.Background()) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1088,8 +1039,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1106,7 +1056,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1114,11 +1064,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CDCSyncMode: protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, - CdcStagingPath: "peerdb_staging", + CdcStagingPath: "", } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1136,16 +1086,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 3a0f71b15a..fe1c6c029a 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -94,21 +94,6 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { return nil, fmt.Errorf("failed to create e2e_test schema: %w", err) } - _, err = pool.Exec(context.Background(), ` - SELECT pg_advisory_lock(hashtext('peerdb_pg_setup_lock')); - CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; - `) if err != nil { return nil, fmt.Errorf("failed to create utility functions: %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 49aee77b2b..37848f0383 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -32,44 +32,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - s := &PeerFlowE2ETestSuiteSF{} - s.SetT(t) - s.SetupSuite() - - tests := []struct { - name string - test func(t *testing.T) - }{ - {"Test_Complete_Simple_Flow_SF", s.Test_Complete_Simple_Flow_SF}, - {"Test_Invalid_Geo_SF_Avro_CDC", s.Test_Invalid_Geo_SF_Avro_CDC}, - {"Test_Toast_SF", s.Test_Toast_SF}, - {"Test_Toast_Nochanges_SF", s.Test_Toast_Nochanges_SF}, - {"Test_Toast_Advance_1_SF", s.Test_Toast_Advance_1_SF}, - {"Test_Toast_Advance_2_SF", s.Test_Toast_Advance_2_SF}, - {"Test_Toast_Advance_3_SF", s.Test_Toast_Advance_3_SF}, - {"Test_Types_SF", s.Test_Types_SF}, - {"Test_Multi_Table_SF", s.Test_Multi_Table_SF}, - {"Test_Simple_Schema_Changes_SF", s.Test_Simple_Schema_Changes_SF}, - {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, - {"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF}, - {"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF}, - {"Test_Column_Exclusion", s.Test_Column_Exclusion}, - } - - // assert that there are no duplicate test names - testNames := make(map[string]bool) - for _, tt := range tests { - if testNames[tt.name] { - t.Fatalf("duplicate test name: %s", tt.name) - } - testNames[tt.name] = true - - t.Run(tt.name, tt.test) - } - - t.Cleanup(func() { - s.TearDownSuite() - }) + suite.Run(t, new(PeerFlowE2ETestSuiteSF)) } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -151,8 +114,7 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { require.NoError(s.T(), err) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -166,7 +128,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -177,7 +139,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -195,7 +157,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -211,7 +173,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { s.Contains(err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - require.NoError(t, err) + s.NoError(err) s.Equal(20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago @@ -220,7 +182,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - require.NoError(t, err) + s.NoError(err) s.Equal(20, numNewRows) // TODO: verify that the data is correctly synced to the destination table @@ -229,8 +191,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -244,7 +205,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -255,7 +216,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -276,7 +237,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -286,7 +247,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -304,11 +265,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - require.NoError(t, err) + s.NoError(err) s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - require.NoError(t, err) + s.NoError(err) s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table @@ -317,8 +278,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -333,7 +293,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { k int ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -344,7 +304,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -369,7 +329,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -387,8 +347,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -405,7 +364,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { ); `, srcTableName, srcTableName)) log.Infof("Creating table '%s', err: %v", srcTableName, err) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -416,7 +375,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -434,7 +393,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -452,8 +411,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -469,7 +427,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -480,7 +438,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -511,7 +469,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -529,8 +487,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -545,7 +502,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -556,7 +513,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -581,7 +538,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -599,8 +556,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -616,7 +572,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { k int ); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -627,7 +583,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -651,7 +607,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Executed a transaction touching toast columns") }() @@ -669,8 +625,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -687,7 +642,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); `, srcTableName, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -698,7 +653,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -724,7 +679,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -750,8 +705,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -764,7 +718,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -775,7 +729,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -791,7 +745,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -801,9 +755,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - require.NoError(t, err) + s.NoError(err) count2, err := s.sfHelper.CountRows("test2_sf") - require.NoError(t, err) + s.NoError(err) s.Equal(1, count1) s.Equal(1, count2) @@ -811,8 +765,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -825,7 +778,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { c1 BIGINT ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -836,7 +789,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -850,7 +803,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -867,18 +820,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -896,18 +849,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -926,18 +879,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(t, err) + s.NoError(err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(t, err) + s.NoError(err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -956,7 +909,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(t, err) + s.NoError(err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() @@ -974,8 +927,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -991,7 +943,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1002,7 +954,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1019,7 +971,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") @@ -1029,9 +981,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1050,8 +1002,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1068,7 +1019,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1079,7 +1030,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1091,7 +1042,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(t, err) + s.NoError(err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1099,18 +1050,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) err = rowsTx.Commit(context.Background()) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1129,8 +1080,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1147,7 +1097,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1158,7 +1108,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1176,16 +1126,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1204,8 +1154,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { - t.Parallel() +func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1222,7 +1171,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_exclude_flow"), @@ -1259,20 +1208,20 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) `, srcTableName), i, testValue) - require.NoError(t, err) + s.NoError(err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() s.Error(err) s.Contains(err.Error(), "continue as new") @@ -1280,11 +1229,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion(t *testing.T) { query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) - require.NoError(t, err) + s.NoError(err) for _, field := range sfRows.Schema.Fields { - require.NotEqual(t, field.Name, "c2") + s.NotEqual(field.Name, "c2") } - require.Equal(t, 4, len(sfRows.Schema.Fields)) - require.Equal(t, 10, len(sfRows.Records)) + s.Equal(4, len(sfRows.Schema.Fields)) + s.Equal(10, len(sfRows.Records)) } diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 25f4f7b20c..27ebc4014e 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -21,31 +21,32 @@ type QRecordBatch struct { // Equals checks if two QRecordBatches are identical. func (q *QRecordBatch) Equals(other *QRecordBatch) bool { if other == nil { + fmt.Printf("other is nil") return q == nil } // First check simple attributes if q.NumRecords != other.NumRecords { // print num records - log.Infof("q.NumRecords: %d\n", q.NumRecords) - log.Infof("other.NumRecords: %d\n", other.NumRecords) + fmt.Printf("q.NumRecords: %d\n", q.NumRecords) + fmt.Printf("other.NumRecords: %d\n", other.NumRecords) return false } // Compare column names if !q.Schema.EqualNames(other.Schema) { - log.Infof("Column names are not equal") - log.Infof("Schema 1: %v", q.Schema.GetColumnNames()) - log.Infof("Schema 2: %v", other.Schema.GetColumnNames()) + fmt.Printf("Column names are not equal\n") + fmt.Printf("Schema 1: %v\n", q.Schema.GetColumnNames()) + fmt.Printf("Schema 2: %v\n", other.Schema.GetColumnNames()) return false } // Compare records for i, record := range q.Records { if !record.equals(other.Records[i]) { - log.Infof("Record %d is not equal", i) - log.Infof("Record 1: %v", record) - log.Infof("Record 2: %v", other.Records[i]) + fmt.Printf("Record %d is not equal\n", i) + fmt.Printf("Record 1: %v\n", record) + fmt.Printf("Record 2: %v\n", other.Records[i]) return false } }