diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 1a3b7fab6d..03c61d110c 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -261,7 +261,12 @@ func (c *BigQueryConnector) SetupMetadataTables() error { }, } if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil { - return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + // if the table already exists, ignore the error + if !strings.Contains(err.Error(), "Already Exists") { + return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + } else { + log.Infof("table %s already exists", MirrorJobsTable) + } } return nil diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 400ecc3d01..d9111932a1 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -3,10 +3,12 @@ package e2e_bigquery import ( "context" "fmt" + "strings" "testing" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -16,26 +18,61 @@ import ( "go.temporal.io/sdk/testsuite" ) -const bigquerySuffix = "bigquery" - type PeerFlowE2ETestSuiteBQ struct { suite.Suite testsuite.WorkflowTestSuite + bqSuffix string pool *pgxpool.Pool bqHelper *BigQueryTestHelper } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) + s := &PeerFlowE2ETestSuiteBQ{} + s.SetT(t) + s.SetupSuite() + + tests := []struct { + name string + test func(t *testing.T) + }{ + {"Test_Invalid_Connection_Config", s.Test_Invalid_Connection_Config}, + {"Test_Complete_Flow_No_Data", s.Test_Complete_Flow_No_Data}, + {"Test_Char_ColType_Error", s.Test_Char_ColType_Error}, + {"Test_Complete_Simple_Flow_BQ", s.Test_Complete_Simple_Flow_BQ}, + {"Test_Toast_BQ", s.Test_Toast_BQ}, + {"Test_Toast_Nochanges_BQ", s.Test_Toast_Nochanges_BQ}, + {"Test_Toast_Advance_1_BQ", s.Test_Toast_Advance_1_BQ}, + {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, + {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, + {"Test_Types_BQ", s.Test_Types_BQ}, + {"Test_Types_Avro_BQ", s.Test_Types_Avro_BQ}, + {"Test_Simple_Flow_BQ_Avro_CDC", s.Test_Simple_Flow_BQ_Avro_CDC}, + {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, + {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, + {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, + {"Test_Composite_PKey_Toast_1_BQ", s.Test_Composite_PKey_Toast_1_BQ}, + {"Test_Composite_PKey_Toast_2_BQ", s.Test_Composite_PKey_Toast_2_BQ}, + } + + // Assert that there are no duplicate test names + testNames := make(map[string]bool) + for _, tt := range tests { + if testNames[tt.name] { + t.Fatalf("duplicate test name: %s", tt.name) + } + testNames[tt.name] = true + + t.Run(tt.name, tt.test) + } } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tableName) + return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName) } func (s *PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, bigquerySuffix) + return fmt.Sprintf("%s_%s", input, s.bqSuffix) } // setupBigQuery sets up the bigquery connection. @@ -54,6 +91,14 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBigQuery() error { return nil } +func (s *PeerFlowE2ETestSuiteBQ) setupTemporalLogger() { + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tlogger := e2e.NewTLogrusLogger(logger) + s.SetLogger(tlogger) +} + // Implement SetupAllSuite interface to setup the test suite func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { err := godotenv.Load() @@ -64,8 +109,12 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { } log.SetReportCaller(true) + log.SetLevel(log.WarnLevel) - pool, err := e2e.SetupPostgres(bigquerySuffix) + s.setupTemporalLogger() + + s.bqSuffix = strings.ToLower(util.RandomString(8)) + pool, err := e2e.SetupPostgres(s.bqSuffix) if err != nil { s.Fail("failed to setup postgres", err) } @@ -79,7 +128,7 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { // Implement TearDownAllSuite interface to tear down the test suite func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, bigquerySuffix) + err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { s.Fail("failed to drop Postgres schema", err) } @@ -90,7 +139,8 @@ func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -113,7 +163,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -127,7 +178,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { value VARCHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -137,7 +188,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -157,7 +208,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -171,7 +223,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { value CHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -181,7 +233,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -204,7 +256,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -218,7 +271,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -228,7 +281,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -246,7 +299,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -262,7 +315,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -271,7 +324,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -284,12 +338,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -299,7 +350,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -324,7 +375,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -342,7 +393,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -355,12 +407,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -370,7 +419,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -388,7 +437,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -406,7 +455,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -419,12 +469,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -434,7 +481,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -465,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -483,7 +530,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -495,12 +543,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { id SERIAL PRIMARY KEY, t1 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -510,7 +555,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -535,7 +580,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -553,7 +598,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -566,12 +612,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -581,7 +624,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -605,7 +648,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -623,7 +666,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -637,16 +681,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -656,7 +692,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -683,7 +719,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -710,7 +746,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -724,16 +761,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_avro_bq"), @@ -745,7 +774,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -772,7 +801,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { ARRAY[0.0003, 1039.0034], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -799,7 +828,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -813,7 +843,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_bq_avro_cdc"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -824,7 +854,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -839,7 +869,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -855,7 +885,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -864,7 +894,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -877,7 +908,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -887,7 +918,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -903,7 +934,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert on two tables") }() @@ -914,9 +945,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - s.NoError(err) + require.NoError(t, err) count2, err := s.bqHelper.countRows(dstTable2Name) - s.NoError(err) + require.NoError(t, err) s.Equal(1, count1) s.Equal(1, count2) @@ -925,7 +956,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -938,7 +970,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -948,7 +980,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -962,7 +994,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -972,11 +1004,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,11 +1018,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1000,11 +1032,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1025,7 +1057,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1041,7 +1074,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1051,7 +1084,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1068,7 +1101,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1078,9 +1111,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1098,7 +1131,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1113,12 +1147,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1128,7 +1159,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1140,7 +1171,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1148,18 +1179,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1178,7 +1209,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1193,12 +1225,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1208,7 +1237,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1226,16 +1255,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 5e6374cc1a..8bd4b6135f 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -11,9 +11,9 @@ import ( ) func (s *PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, bigquerySuffix, tableName) + err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, bigquerySuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) s.NoError(err) } @@ -33,7 +33,7 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, bigquerySuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, s.bqSuffix, tableName), ) s.NoError(err) @@ -58,10 +58,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { s.setupBQDestinationTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - bigquerySuffix, tblName) + s.bqSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", - fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index c5228258b0..a5e51074ad 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -94,6 +94,24 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { return nil, fmt.Errorf("failed to create e2e_test schema: %w", err) } + _, err = pool.Exec(context.Background(), ` + CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) + RETURNS bytea AS $body$ + SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') + FROM generate_series(1, $1); + $body$ + LANGUAGE 'sql' + VOLATILE + SET search_path = 'pg_catalog'; + `) + if err != nil { + return nil, fmt.Errorf("failed to create utility functions: %w", err) + } + return pool, nil } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 1a2d111513..7bccc6cfa8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -120,24 +120,6 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { } s.pool = pool - _, err = s.pool.Exec(context.Background(), ` - CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; - `) - if err != nil { - s.Fail("failed to setup random_string and random_bytea functions", err) - } - err = s.setupSnowflake() if err != nil { s.Fail("failed to setup snowflake", err)