From 84689b3417cfac4f3403658b7a0f7a0f3ac7b1d7 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 13:52:19 -0500 Subject: [PATCH 1/8] make some of our snowflake tests run in parallel --- flow/e2e/snowflake/peer_flow_sf_test.go | 402 ++++++++++++++---------- flow/e2e/snowflake/qrep_flow_sf_test.go | 24 +- flow/e2e/test_utils.go | 48 ++- 3 files changed, 295 insertions(+), 179 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..1a2d111513 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -5,40 +5,76 @@ import ( "fmt" "strings" "testing" + "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.temporal.io/sdk/testsuite" ) -const snowflakeSuffix = "snowflake" - type PeerFlowE2ETestSuiteSF struct { suite.Suite testsuite.WorkflowTestSuite + pgSuffix string pool *pgxpool.Pool sfHelper *SnowflakeTestHelper connector *connsnowflake.SnowflakeConnector } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) + s := &PeerFlowE2ETestSuiteSF{} + s.SetT(t) + s.SetupSuite() + + tests := []struct { + name string + test func(t *testing.T) + }{ + {"Test_Complete_Simple_Flow_SF", s.Test_Complete_Simple_Flow_SF}, + {"Test_Complete_Simple_Flow_SF_Avro_CDC", s.Test_Complete_Simple_Flow_SF_Avro_CDC}, + {"Test_Invalid_Geo_SF_Avro_CDC", s.Test_Invalid_Geo_SF_Avro_CDC}, + {"Test_Toast_SF", s.Test_Toast_SF}, + {"Test_Toast_Nochanges_SF", s.Test_Toast_Nochanges_SF}, + {"Test_Toast_Advance_1_SF", s.Test_Toast_Advance_1_SF}, + {"Test_Toast_Advance_2_SF", s.Test_Toast_Advance_2_SF}, + {"Test_Toast_Advance_3_SF", s.Test_Toast_Advance_3_SF}, + {"Test_Types_SF", s.Test_Types_SF}, + {"Test_Types_SF_Avro_CDC", s.Test_Types_SF_Avro_CDC}, + {"Test_Multi_Table_SF", s.Test_Multi_Table_SF}, + {"Test_Simple_Schema_Changes_SF", s.Test_Simple_Schema_Changes_SF}, + {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, + {"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF}, + {"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF}, + } + + // assert that there are no duplicate test names + testNames := make(map[string]bool) + for _, tt := range tests { + if testNames[tt.name] { + t.Fatalf("duplicate test name: %s", tt.name) + } + testNames[tt.name] = true + + t.Run(tt.name, tt.test) + } } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tableName) + return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) } func (s *PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, snowflakeSuffix) + return fmt.Sprintf("%s_%s", input, s.pgSuffix) } // setupSnowflake sets up the snowflake connection. @@ -53,6 +89,14 @@ func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { return nil } +func (s *PeerFlowE2ETestSuiteSF) setupTemporalLogger() { + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tlogger := e2e.NewTLogrusLogger(logger) + s.SetLogger(tlogger) +} + func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { err := godotenv.Load() if err != nil { @@ -62,13 +106,38 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { } log.SetReportCaller(true) + log.SetLevel(log.WarnLevel) + + s.setupTemporalLogger() - pool, err := e2e.SetupPostgres(snowflakeSuffix) + suffix := util.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + s.pgSuffix = fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) + + pool, err := e2e.SetupPostgres(s.pgSuffix) if err != nil { s.Fail("failed to setup postgres", err) } s.pool = pool + _, err = s.pool.Exec(context.Background(), ` + CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) + RETURNS bytea AS $body$ + SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') + FROM generate_series(1, $1); + $body$ + LANGUAGE 'sql' + VOLATILE + SET search_path = 'pg_catalog'; + `) + if err != nil { + s.Fail("failed to setup random_string and random_bytea functions", err) + } + err = s.setupSnowflake() if err != nil { s.Fail("failed to setup snowflake", err) @@ -76,12 +145,12 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), s.sfHelper.Config) - s.NoError(err) + require.NoError(s.T(), err) } // Implement TearDownAllSuite interface to tear down the test suite func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, snowflakeSuffix) + err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { s.Fail("failed to drop Postgres schema", err) } @@ -94,10 +163,12 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { } err = s.connector.Close() - s.NoError(err) + require.NoError(s.T(), err) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -105,13 +176,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) + connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +192,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -138,7 +210,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -154,7 +226,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.Contains(err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -163,21 +235,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_flow_sf_avro_cdc") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf_avro_cdc") + tblConst := "test_simple_flow_sf_avro_cdc" + srcTableName := s.attachSchemaSuffix(tblConst) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblConst) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_avro"), @@ -188,7 +263,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -200,30 +275,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := 0; i < 15; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 15 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error s.Error(err) s.Contains(err.Error(), "continue as new") - count, err := s.sfHelper.CountRows("test_simple_flow_sf_avro_cdc") - s.NoError(err) - s.Equal(10, count) + count, err := s.sfHelper.CountRows(tblConst) + require.NoError(t, err) + s.Equal(15, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -231,7 +306,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -239,13 +316,13 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, line GEOMETRY(LINESTRING) NOT NULL, poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -256,7 +333,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -277,7 +354,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -287,7 +364,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -305,11 +382,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) + require.NoError(t, err) s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) + require.NoError(t, err) s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table @@ -318,7 +395,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -326,17 +405,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -346,7 +422,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -371,7 +447,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -389,7 +465,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -397,17 +475,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( - id SERIAL PRIMARY KEY, - t1 text, - t2 text, - k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + t1 text, + t2 text, + k int + ); +`, srcTableName, srcTableName)) + log.Infof("Creating table '%s', err: %v", srcTableName, err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -417,7 +494,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -435,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -453,7 +530,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -461,17 +540,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -481,7 +558,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -512,7 +589,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -530,7 +607,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -538,16 +617,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -557,7 +634,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -582,7 +659,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -600,7 +677,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -608,17 +687,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); +`, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -628,7 +705,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -652,7 +729,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -670,7 +747,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -678,23 +757,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; - `, srcTableName)) - s.NoError(err) + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -704,7 +776,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -730,7 +802,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -757,7 +829,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -765,23 +839,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -792,7 +858,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -818,7 +884,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -845,7 +911,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -855,10 +923,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial primary key, c1 int, c2 text); - CREATE TABLE %s (id serial primary key, c1 int, c2 text); + CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); + CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -868,7 +936,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -884,7 +952,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -895,9 +963,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + require.NoError(t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + require.NoError(t, err) s.Equal(1, count1) s.Equal(1, count2) @@ -905,7 +973,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -918,7 +988,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -928,7 +998,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -942,7 +1012,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -958,18 +1028,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,18 +1056,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1015,18 +1085,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1044,7 +1114,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() @@ -1062,7 +1132,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1078,7 +1150,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1088,7 +1160,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1105,7 +1177,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1115,9 +1187,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1136,7 +1208,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1151,12 +1225,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1166,7 +1237,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1178,7 +1249,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1186,18 +1257,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1216,7 +1287,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1231,12 +1304,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1246,7 +1316,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1264,16 +1334,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..82901beac2 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,9 +12,9 @@ import ( ) func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, snowflakeSuffix, tableName) + err := e2e.CreateSourceTableQRep(s.pool, s.pgSuffix, tableName) s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, snowflakeSuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.pool, s.pgSuffix, tableName, rowCount) s.NoError(err) } @@ -35,12 +35,12 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, snowflakeSuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, s.pgSuffix, tableName), ) require.NoError(s.T(), err) // read rows from destination table - qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) + qualifiedTableName := fmt.Sprintf("%s.%s.%s", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) var sfSelQuery string if caseSensitive { sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName) @@ -68,11 +68,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -109,11 +109,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -154,7 +154,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", @@ -194,11 +194,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf_xmin", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -239,7 +239,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) sfPeer := s.sfHelper.Peer sfPeer.GetSnowflakeConfig().S3Integration = "peerdb_s3_integration" diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4f53e9a6a4..dfa06a44c2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -233,7 +233,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 %s ) VALUES %s; `, suffix, tableName, geoColumns, strings.Join(rows, ","))) @@ -357,3 +357,49 @@ func GetOwnersSelectorString() string { } return strings.Join(fields, ",") } + +// implement temporal logger interface with logrus +// +// type Logger interface { +// Debug(msg string, keyvals ...interface{}) +// Info(msg string, keyvals ...interface{}) +// Warn(msg string, keyvals ...interface{}) +// Error(msg string, keyvals ...interface{}) +// } +type TLogrusLogger struct { + logger *log.Logger +} + +func NewTLogrusLogger(logger *log.Logger) *TLogrusLogger { + return &TLogrusLogger{logger: logger} +} + +func (l *TLogrusLogger) keyvalsToFields(keyvals []interface{}) log.Fields { + fields := make(log.Fields) + for i := 0; i < len(keyvals); i += 2 { + key := fmt.Sprintf("%v", keyvals[i]) + if i+1 < len(keyvals) { + fields[key] = keyvals[i+1] + } else { + // Handle the case where there is no value for the key + fields[key] = nil // or some default value + } + } + return fields +} + +func (l *TLogrusLogger) Debug(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Debug(msg) +} + +func (l *TLogrusLogger) Info(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Info(msg) +} + +func (l *TLogrusLogger) Warn(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Warn(msg) +} + +func (l *TLogrusLogger) Error(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Error(msg) +} From 80a2bf0bb91b430474693ab7601a3da7720eb7fc Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 13:57:29 -0500 Subject: [PATCH 2/8] increase parallelism --- .github/workflows/flow.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index d1a61dbcb7..e4edd44160 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -41,12 +41,12 @@ jobs: - name: install gotestsum run: | go install gotest.tools/gotestsum@latest - + - name: install lib-geos run: | sudo apt-get update sudo apt-get install libgeos-dev - + - name: download go modules run: | go mod download @@ -65,14 +65,14 @@ jobs: with: name: "snowflake_creds.json" json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }} - + - name: setup S3 credentials id: s3-credentials uses: jsdaniell/create-json@v1.2.2 with: name: "s3_creds.json" json: ${{ secrets.S3_CREDS }} - + - name: setup GCS credentials id: gcs-credentials uses: jsdaniell/create-json@v1.2.2 @@ -94,7 +94,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 4 ./... -timeout 2400s + gotestsum --format testname -- -p 16 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} From 529e8ae2e433361d8b6f24196e9e676c49a9328b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 14:52:17 -0500 Subject: [PATCH 3/8] fix sql server suffix --- flow/e2e/sqlserver/qrep_flow_sqlserver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index aad017c01a..9c2b27bcb0 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -19,7 +19,7 @@ import ( "go.temporal.io/sdk/testsuite" ) -const sqlserverSuffix = "s3" +const sqlserverSuffix = "sqlserver" type PeerFlowE2ETestSuiteSQLServer struct { suite.Suite From cffa3c6c86905de69fb98ba8bff9119044e02e39 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 15:18:51 -0500 Subject: [PATCH 4/8] make bq tests faster too --- flow/connectors/bigquery/bigquery.go | 7 +- flow/e2e/bigquery/peer_flow_bq_test.go | 293 +++++++++++++----------- flow/e2e/bigquery/qrep_flow_bq_test.go | 10 +- flow/e2e/congen.go | 18 ++ flow/e2e/snowflake/peer_flow_sf_test.go | 18 -- 5 files changed, 190 insertions(+), 156 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4f6eb35ab4..82e16282fc 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -261,7 +261,12 @@ func (c *BigQueryConnector) SetupMetadataTables() error { }, } if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil { - return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + // if the table already exists, ignore the error + if !strings.Contains(err.Error(), "Already Exists") { + return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + } else { + log.Infof("table %s already exists", MirrorJobsTable) + } } return nil diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 400ecc3d01..d9111932a1 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -3,10 +3,12 @@ package e2e_bigquery import ( "context" "fmt" + "strings" "testing" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -16,26 +18,61 @@ import ( "go.temporal.io/sdk/testsuite" ) -const bigquerySuffix = "bigquery" - type PeerFlowE2ETestSuiteBQ struct { suite.Suite testsuite.WorkflowTestSuite + bqSuffix string pool *pgxpool.Pool bqHelper *BigQueryTestHelper } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) + s := &PeerFlowE2ETestSuiteBQ{} + s.SetT(t) + s.SetupSuite() + + tests := []struct { + name string + test func(t *testing.T) + }{ + {"Test_Invalid_Connection_Config", s.Test_Invalid_Connection_Config}, + {"Test_Complete_Flow_No_Data", s.Test_Complete_Flow_No_Data}, + {"Test_Char_ColType_Error", s.Test_Char_ColType_Error}, + {"Test_Complete_Simple_Flow_BQ", s.Test_Complete_Simple_Flow_BQ}, + {"Test_Toast_BQ", s.Test_Toast_BQ}, + {"Test_Toast_Nochanges_BQ", s.Test_Toast_Nochanges_BQ}, + {"Test_Toast_Advance_1_BQ", s.Test_Toast_Advance_1_BQ}, + {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, + {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, + {"Test_Types_BQ", s.Test_Types_BQ}, + {"Test_Types_Avro_BQ", s.Test_Types_Avro_BQ}, + {"Test_Simple_Flow_BQ_Avro_CDC", s.Test_Simple_Flow_BQ_Avro_CDC}, + {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, + {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, + {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, + {"Test_Composite_PKey_Toast_1_BQ", s.Test_Composite_PKey_Toast_1_BQ}, + {"Test_Composite_PKey_Toast_2_BQ", s.Test_Composite_PKey_Toast_2_BQ}, + } + + // Assert that there are no duplicate test names + testNames := make(map[string]bool) + for _, tt := range tests { + if testNames[tt.name] { + t.Fatalf("duplicate test name: %s", tt.name) + } + testNames[tt.name] = true + + t.Run(tt.name, tt.test) + } } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tableName) + return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName) } func (s *PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, bigquerySuffix) + return fmt.Sprintf("%s_%s", input, s.bqSuffix) } // setupBigQuery sets up the bigquery connection. @@ -54,6 +91,14 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBigQuery() error { return nil } +func (s *PeerFlowE2ETestSuiteBQ) setupTemporalLogger() { + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tlogger := e2e.NewTLogrusLogger(logger) + s.SetLogger(tlogger) +} + // Implement SetupAllSuite interface to setup the test suite func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { err := godotenv.Load() @@ -64,8 +109,12 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { } log.SetReportCaller(true) + log.SetLevel(log.WarnLevel) - pool, err := e2e.SetupPostgres(bigquerySuffix) + s.setupTemporalLogger() + + s.bqSuffix = strings.ToLower(util.RandomString(8)) + pool, err := e2e.SetupPostgres(s.bqSuffix) if err != nil { s.Fail("failed to setup postgres", err) } @@ -79,7 +128,7 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { // Implement TearDownAllSuite interface to tear down the test suite func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, bigquerySuffix) + err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { s.Fail("failed to drop Postgres schema", err) } @@ -90,7 +139,8 @@ func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -113,7 +163,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -127,7 +178,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { value VARCHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -137,7 +188,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -157,7 +208,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -171,7 +223,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { value CHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -181,7 +233,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -204,7 +256,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -218,7 +271,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -228,7 +281,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -246,7 +299,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -262,7 +315,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -271,7 +324,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -284,12 +338,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -299,7 +350,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -324,7 +375,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -342,7 +393,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -355,12 +407,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -370,7 +419,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -388,7 +437,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -406,7 +455,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -419,12 +469,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -434,7 +481,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -465,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -483,7 +530,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -495,12 +543,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { id SERIAL PRIMARY KEY, t1 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -510,7 +555,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -535,7 +580,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -553,7 +598,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -566,12 +612,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -581,7 +624,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -605,7 +648,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -623,7 +666,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -637,16 +681,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -656,7 +692,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -683,7 +719,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -710,7 +746,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -724,16 +761,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_avro_bq"), @@ -745,7 +774,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -772,7 +801,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { ARRAY[0.0003, 1039.0034], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert with all types") }() @@ -799,7 +828,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -813,7 +843,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_bq_avro_cdc"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -824,7 +854,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -839,7 +869,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -855,7 +885,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -864,7 +894,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -877,7 +908,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -887,7 +918,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -903,7 +934,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert on two tables") }() @@ -914,9 +945,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - s.NoError(err) + require.NoError(t, err) count2, err := s.bqHelper.countRows(dstTable2Name) - s.NoError(err) + require.NoError(t, err) s.Equal(1, count1) s.Equal(1, count2) @@ -925,7 +956,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -938,7 +970,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -948,7 +980,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -962,7 +994,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -972,11 +1004,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,11 +1018,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1000,11 +1032,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1025,7 +1057,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1041,7 +1074,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1051,7 +1084,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1068,7 +1101,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1078,9 +1111,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1098,7 +1131,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1113,12 +1147,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1128,7 +1159,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1140,7 +1171,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1148,18 +1179,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1178,7 +1209,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1193,12 +1225,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1208,7 +1237,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1226,16 +1255,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 5e6374cc1a..8bd4b6135f 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -11,9 +11,9 @@ import ( ) func (s *PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, bigquerySuffix, tableName) + err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, bigquerySuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) s.NoError(err) } @@ -33,7 +33,7 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, bigquerySuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, s.bqSuffix, tableName), ) s.NoError(err) @@ -58,10 +58,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { s.setupBQDestinationTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - bigquerySuffix, tblName) + s.bqSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", - fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index c5228258b0..a5e51074ad 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -94,6 +94,24 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { return nil, fmt.Errorf("failed to create e2e_test schema: %w", err) } + _, err = pool.Exec(context.Background(), ` + CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) + RETURNS bytea AS $body$ + SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') + FROM generate_series(1, $1); + $body$ + LANGUAGE 'sql' + VOLATILE + SET search_path = 'pg_catalog'; + `) + if err != nil { + return nil, fmt.Errorf("failed to create utility functions: %w", err) + } + return pool, nil } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 1a2d111513..7bccc6cfa8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -120,24 +120,6 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { } s.pool = pool - _, err = s.pool.Exec(context.Background(), ` - CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; - `) - if err != nil { - s.Fail("failed to setup random_string and random_bytea functions", err) - } - err = s.setupSnowflake() if err != nil { s.Fail("failed to setup snowflake", err) From d9225551fed551afb07be99a5cb81805276bcf9b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 15:22:00 -0500 Subject: [PATCH 5/8] remove some printlns --- flow/e2e/bigquery/peer_flow_bq_test.go | 2 -- flow/e2e/snowflake/peer_flow_sf_test.go | 3 --- flow/e2e/test_utils.go | 1 - 3 files changed, 6 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index d9111932a1..fe8bee3d6b 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -720,7 +720,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { ARRAY['hello','bye']; `, srcTableName)) require.NoError(t, err) - fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -802,7 +801,6 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { ARRAY['hello','bye']; `, srcTableName)) require.NoError(t, err) - fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 7bccc6cfa8..163d61447e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -785,7 +785,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) require.NoError(t, err) - fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -867,7 +866,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) require.NoError(t, err) - fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -935,7 +933,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) require.NoError(t, err) - fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index dfa06a44c2..f26afc2ee1 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -72,7 +72,6 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, } if state.SetupComplete { - fmt.Println("query indicates setup is complete") break } } else { From 54bc55bbc27693f41533fe882437c516d56c7073 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 Nov 2023 15:45:24 -0500 Subject: [PATCH 6/8] more fixes --- flow/e2e/congen.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index a5e51074ad..fa0ca8e40e 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -95,6 +95,7 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { } _, err = pool.Exec(context.Background(), ` + SELECT pg_advisory_lock(hashtext('peerdb_pg_setup_lock')); CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); From 8fbf1d2f75634570d14aa7ab99fca9d44449c2d3 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 11 Nov 2023 11:50:14 -0500 Subject: [PATCH 7/8] use advisory locks to prevent concurrent updates on the mirror jobs --- .github/workflows/flow.yml | 8 ++- flow/connectors/bigquery/bigquery.go | 82 +++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e4edd44160..c1ac28252c 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -86,6 +86,7 @@ jobs: docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;" + docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;" docker restart pg_cdc working-directory: ./flow env: @@ -94,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 16 ./... -timeout 2400s + gotestsum --format testname -- -p 32 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} @@ -114,3 +115,8 @@ jobs: SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }} SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }} + PEERDB_CATALOG_HOST: localhost + PEERDB_CATALOG_PORT: 7132 + PEERDB_CATALOG_USER: postgres + PEERDB_CATALOG_PASSWORD: postgres + PEERDB_CATALOG_DATABASE: postgres diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 82e16282fc..7e9089ee95 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -13,10 +13,12 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/storage" "github.com/PeerDB-io/peer-flow/connectors/utils" + cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "google.golang.org/api/iterator" @@ -64,6 +66,7 @@ type BigQueryConnector struct { storageClient *storage.Client tableNameSchemaMapping map[string]*protos.TableSchema datasetID string + catalogPool *pgxpool.Pool } type StagingBQRecord struct { @@ -177,12 +180,18 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create Storage client: %v", err) } + catalogPool, err := cc.GetCatalogConnectionPoolFromEnv() + if err != nil { + return nil, fmt.Errorf("failed to create catalog connection pool: %v", err) + } + return &BigQueryConnector{ ctx: ctx, bqConfig: config, client: client, datasetID: datasetID, storageClient: storageClient, + catalogPool: catalogPool, }, nil } @@ -614,6 +623,18 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, return nil, fmt.Errorf("failed to get last checkpoint: %v", err) } + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab jobs update lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release jobs update lock: %v", err) + } + }() + // we have to do the following things in a transaction // 1. append the records in the staging table to the raw table. // 2. execute the update metadata query to store the last committed watermark. @@ -916,6 +937,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // append all the statements to one list log.Printf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames) + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release lock: %v", err) + } + }() + stmts = append(stmts, "BEGIN TRANSACTION;") for _, tableName := range distinctTableNames { @@ -1134,9 +1167,21 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { + release, err := c.grabJobsUpdateLock() + if err != nil { + return fmt.Errorf("failed to grab lock: %w", err) + } + + defer func() { + err := release() + if err != nil { + log.Printf("failed to release lock: %v", err) + } + }() + dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1185,11 +1230,44 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error { return nil } +// Bigquery doesn't allow concurrent updates to the same table. +// we grab a lock on catalog to ensure that only one job is updating +// bigquery tables at a time. +// returns a function to release the lock. +func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { + tx, err := c.catalogPool.Begin(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + + // grab an advisory lock based on the mirror jobs table hash + mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) + _, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl) + + if err != nil { + err = tx.Rollback(c.ctx) + return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) + } + + return func() error { + // release the lock + _, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl) + if err != nil { + return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err) + } + + err = tx.Commit(c.ctx) + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil + }, nil +} + func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName dst := renameRequest.NewName - log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("renaming table '%s' to '%s'...", src, dst) From 624236ffe45dd6ab16c3ec7392d00aef1b31500b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 11 Nov 2023 12:16:46 -0500 Subject: [PATCH 8/8] maybe faster --- .github/workflows/flow.yml | 1 + flow/e2e/bigquery/peer_flow_bq_test.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index c1ac28252c..a0a1889838 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -120,3 +120,4 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres + PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index fe8bee3d6b..be7f45ef4a 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1085,7 +1085,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -1238,7 +1238,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, + TotalSyncFlows: 2, MaxBatchSize: 100, }