diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index d1a61dbcb7..a0a1889838 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -41,12 +41,12 @@ jobs: - name: install gotestsum run: | go install gotest.tools/gotestsum@latest - + - name: install lib-geos run: | sudo apt-get update sudo apt-get install libgeos-dev - + - name: download go modules run: | go mod download @@ -65,14 +65,14 @@ jobs: with: name: "snowflake_creds.json" json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }} - + - name: setup S3 credentials id: s3-credentials uses: jsdaniell/create-json@v1.2.2 with: name: "s3_creds.json" json: ${{ secrets.S3_CREDS }} - + - name: setup GCS credentials id: gcs-credentials uses: jsdaniell/create-json@v1.2.2 @@ -86,6 +86,7 @@ jobs: docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;" docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;" + docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;" docker restart pg_cdc working-directory: ./flow env: @@ -94,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 4 ./... -timeout 2400s + gotestsum --format testname -- -p 32 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} @@ -114,3 +115,9 @@ jobs: SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} SQLSERVER_PASSWORD: ${{ secrets.SQLSERVER_PASSWORD }} SQLSERVER_DB: ${{ secrets.SQLSERVER_DB }} + PEERDB_CATALOG_HOST: localhost + PEERDB_CATALOG_PORT: 7132 + PEERDB_CATALOG_USER: postgres + PEERDB_CATALOG_PASSWORD: postgres + PEERDB_CATALOG_DATABASE: postgres + PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4f6eb35ab4..7e9089ee95 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -13,10 +13,12 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/storage" "github.com/PeerDB-io/peer-flow/connectors/utils" + cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" "google.golang.org/api/iterator" @@ -64,6 +66,7 @@ type BigQueryConnector struct { storageClient *storage.Client tableNameSchemaMapping map[string]*protos.TableSchema datasetID string + catalogPool *pgxpool.Pool } type StagingBQRecord struct { @@ -177,12 +180,18 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create Storage client: %v", err) } + catalogPool, err := cc.GetCatalogConnectionPoolFromEnv() + if err != nil { + return nil, fmt.Errorf("failed to create catalog connection pool: %v", err) + } + return &BigQueryConnector{ ctx: ctx, bqConfig: config, client: client, datasetID: datasetID, storageClient: storageClient, + catalogPool: catalogPool, }, nil } @@ -261,7 +270,12 @@ func (c *BigQueryConnector) SetupMetadataTables() error { }, } if err := mirrorJobsTable.Create(c.ctx, mirrorJobsTableMetadata); err != nil { - return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + // if the table already exists, ignore the error + if !strings.Contains(err.Error(), "Already Exists") { + return fmt.Errorf("failed to create table %s: %w", MirrorJobsTable, err) + } else { + log.Infof("table %s already exists", MirrorJobsTable) + } } return nil @@ -609,6 +623,18 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, return nil, fmt.Errorf("failed to get last checkpoint: %v", err) } + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab jobs update lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release jobs update lock: %v", err) + } + }() + // we have to do the following things in a transaction // 1. append the records in the staging table to the raw table. // 2. execute the update metadata query to store the last committed watermark. @@ -911,6 +937,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // append all the statements to one list log.Printf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames) + release, err := c.grabJobsUpdateLock() + if err != nil { + return nil, fmt.Errorf("failed to grab lock: %v", err) + } + + defer func() { + err := release() + if err != nil { + log.Errorf("failed to release lock: %v", err) + } + }() + stmts = append(stmts, "BEGIN TRANSACTION;") for _, tableName := range distinctTableNames { @@ -1129,9 +1167,21 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { + release, err := c.grabJobsUpdateLock() + if err != nil { + return fmt.Errorf("failed to grab lock: %w", err) + } + + defer func() { + err := release() + if err != nil { + log.Printf("failed to release lock: %v", err) + } + }() + dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1180,11 +1230,44 @@ func (c *BigQueryConnector) truncateTable(tableIdentifier string) error { return nil } +// Bigquery doesn't allow concurrent updates to the same table. +// we grab a lock on catalog to ensure that only one job is updating +// bigquery tables at a time. +// returns a function to release the lock. +func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { + tx, err := c.catalogPool.Begin(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + + // grab an advisory lock based on the mirror jobs table hash + mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) + _, err = tx.Exec(c.ctx, "SELECT pg_advisory_lock(hashtext($1))", mjTbl) + + if err != nil { + err = tx.Rollback(c.ctx) + return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) + } + + return func() error { + // release the lock + _, err := tx.Exec(c.ctx, "SELECT pg_advisory_unlock(hashtext($1))", mjTbl) + if err != nil { + return fmt.Errorf("failed to release lock on %s: %w", mjTbl, err) + } + + err = tx.Commit(c.ctx) + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil + }, nil +} + func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName dst := renameRequest.NewName - log.WithFields(log.Fields{ "flowName": req.FlowJobName, }).Infof("renaming table '%s' to '%s'...", src, dst) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 400ecc3d01..be7f45ef4a 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -3,10 +3,12 @@ package e2e_bigquery import ( "context" "fmt" + "strings" "testing" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -16,26 +18,61 @@ import ( "go.temporal.io/sdk/testsuite" ) -const bigquerySuffix = "bigquery" - type PeerFlowE2ETestSuiteBQ struct { suite.Suite testsuite.WorkflowTestSuite + bqSuffix string pool *pgxpool.Pool bqHelper *BigQueryTestHelper } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) + s := &PeerFlowE2ETestSuiteBQ{} + s.SetT(t) + s.SetupSuite() + + tests := []struct { + name string + test func(t *testing.T) + }{ + {"Test_Invalid_Connection_Config", s.Test_Invalid_Connection_Config}, + {"Test_Complete_Flow_No_Data", s.Test_Complete_Flow_No_Data}, + {"Test_Char_ColType_Error", s.Test_Char_ColType_Error}, + {"Test_Complete_Simple_Flow_BQ", s.Test_Complete_Simple_Flow_BQ}, + {"Test_Toast_BQ", s.Test_Toast_BQ}, + {"Test_Toast_Nochanges_BQ", s.Test_Toast_Nochanges_BQ}, + {"Test_Toast_Advance_1_BQ", s.Test_Toast_Advance_1_BQ}, + {"Test_Toast_Advance_2_BQ", s.Test_Toast_Advance_2_BQ}, + {"Test_Toast_Advance_3_BQ", s.Test_Toast_Advance_3_BQ}, + {"Test_Types_BQ", s.Test_Types_BQ}, + {"Test_Types_Avro_BQ", s.Test_Types_Avro_BQ}, + {"Test_Simple_Flow_BQ_Avro_CDC", s.Test_Simple_Flow_BQ_Avro_CDC}, + {"Test_Multi_Table_BQ", s.Test_Multi_Table_BQ}, + {"Test_Simple_Schema_Changes_BQ", s.Test_Simple_Schema_Changes_BQ}, + {"Test_Composite_PKey_BQ", s.Test_Composite_PKey_BQ}, + {"Test_Composite_PKey_Toast_1_BQ", s.Test_Composite_PKey_Toast_1_BQ}, + {"Test_Composite_PKey_Toast_2_BQ", s.Test_Composite_PKey_Toast_2_BQ}, + } + + // Assert that there are no duplicate test names + testNames := make(map[string]bool) + for _, tt := range tests { + if testNames[tt.name] { + t.Fatalf("duplicate test name: %s", tt.name) + } + testNames[tt.name] = true + + t.Run(tt.name, tt.test) + } } func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tableName) + return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName) } func (s *PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, bigquerySuffix) + return fmt.Sprintf("%s_%s", input, s.bqSuffix) } // setupBigQuery sets up the bigquery connection. @@ -54,6 +91,14 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBigQuery() error { return nil } +func (s *PeerFlowE2ETestSuiteBQ) setupTemporalLogger() { + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tlogger := e2e.NewTLogrusLogger(logger) + s.SetLogger(tlogger) +} + // Implement SetupAllSuite interface to setup the test suite func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { err := godotenv.Load() @@ -64,8 +109,12 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { } log.SetReportCaller(true) + log.SetLevel(log.WarnLevel) + + s.setupTemporalLogger() - pool, err := e2e.SetupPostgres(bigquerySuffix) + s.bqSuffix = strings.ToLower(util.RandomString(8)) + pool, err := e2e.SetupPostgres(s.bqSuffix) if err != nil { s.Fail("failed to setup postgres", err) } @@ -79,7 +128,7 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { // Implement TearDownAllSuite interface to tear down the test suite func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, bigquerySuffix) + err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { s.Fail("failed to drop Postgres schema", err) } @@ -90,7 +139,8 @@ func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -113,7 +163,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -127,7 +178,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { value VARCHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -137,7 +188,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -157,7 +208,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -171,7 +223,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { value CHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -181,7 +233,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -204,7 +256,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -218,7 +271,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -228,7 +281,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -246,7 +299,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -262,7 +315,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -271,7 +324,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -284,12 +338,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -299,7 +350,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -324,7 +375,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -342,7 +393,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -355,12 +407,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -370,7 +419,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -388,7 +437,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -406,7 +455,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -419,12 +469,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -434,7 +481,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -465,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -483,7 +530,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -495,12 +543,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { id SERIAL PRIMARY KEY, t1 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -510,7 +555,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -535,7 +580,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -553,7 +598,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -566,12 +612,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -581,7 +624,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -605,7 +648,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -623,7 +666,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -637,16 +681,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -656,7 +692,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -683,8 +719,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) - fmt.Println("Executed an insert with all types") + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -710,7 +745,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -724,16 +760,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_avro_bq"), @@ -745,7 +773,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ @@ -772,8 +800,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { ARRAY[0.0003, 1039.0034], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) - fmt.Println("Executed an insert with all types") + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -799,7 +826,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_Avro_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -813,7 +841,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_bq_avro_cdc"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -824,7 +852,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -839,7 +867,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -855,7 +883,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { s.Contains(err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -864,7 +892,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Flow_BQ_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -877,7 +906,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -887,7 +916,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -903,7 +932,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed an insert on two tables") }() @@ -914,9 +943,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - s.NoError(err) + require.NoError(t, err) count2, err := s.bqHelper.countRows(dstTable2Name) - s.NoError(err) + require.NoError(t, err) s.Equal(1, count1) s.Equal(1, count2) @@ -925,7 +954,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -938,7 +968,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -948,7 +978,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -962,7 +992,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -972,11 +1002,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,11 +1016,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1000,11 +1030,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1025,7 +1055,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1041,7 +1072,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1051,10 +1082,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -1068,7 +1099,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1078,9 +1109,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1098,7 +1129,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1113,12 +1145,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1128,7 +1157,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1140,7 +1169,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1148,18 +1177,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1178,7 +1207,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ(t *testing.T) { + t.Parallel() env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1193,12 +1223,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1208,10 +1235,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, + TotalSyncFlows: 2, MaxBatchSize: 100, } @@ -1226,16 +1253,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 5e6374cc1a..8bd4b6135f 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -11,9 +11,9 @@ import ( ) func (s *PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, bigquerySuffix, tableName) + err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, bigquerySuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) s.NoError(err) } @@ -33,7 +33,7 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, bigquerySuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, s.bqSuffix, tableName), ) s.NoError(err) @@ -58,10 +58,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { s.setupBQDestinationTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - bigquerySuffix, tblName) + s.bqSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", - fmt.Sprintf("e2e_test_%s.%s", bigquerySuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index c5228258b0..fa0ca8e40e 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -94,6 +94,25 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { return nil, fmt.Errorf("failed to create e2e_test schema: %w", err) } + _, err = pool.Exec(context.Background(), ` + SELECT pg_advisory_lock(hashtext('peerdb_pg_setup_lock')); + CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) + RETURNS bytea AS $body$ + SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') + FROM generate_series(1, $1); + $body$ + LANGUAGE 'sql' + VOLATILE + SET search_path = 'pg_catalog'; + `) + if err != nil { + return nil, fmt.Errorf("failed to create utility functions: %w", err) + } + return pool, nil } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..163d61447e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -5,40 +5,76 @@ import ( "fmt" "strings" "testing" + "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + util "github.com/PeerDB-io/peer-flow/utils" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.temporal.io/sdk/testsuite" ) -const snowflakeSuffix = "snowflake" - type PeerFlowE2ETestSuiteSF struct { suite.Suite testsuite.WorkflowTestSuite + pgSuffix string pool *pgxpool.Pool sfHelper *SnowflakeTestHelper connector *connsnowflake.SnowflakeConnector } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) + s := &PeerFlowE2ETestSuiteSF{} + s.SetT(t) + s.SetupSuite() + + tests := []struct { + name string + test func(t *testing.T) + }{ + {"Test_Complete_Simple_Flow_SF", s.Test_Complete_Simple_Flow_SF}, + {"Test_Complete_Simple_Flow_SF_Avro_CDC", s.Test_Complete_Simple_Flow_SF_Avro_CDC}, + {"Test_Invalid_Geo_SF_Avro_CDC", s.Test_Invalid_Geo_SF_Avro_CDC}, + {"Test_Toast_SF", s.Test_Toast_SF}, + {"Test_Toast_Nochanges_SF", s.Test_Toast_Nochanges_SF}, + {"Test_Toast_Advance_1_SF", s.Test_Toast_Advance_1_SF}, + {"Test_Toast_Advance_2_SF", s.Test_Toast_Advance_2_SF}, + {"Test_Toast_Advance_3_SF", s.Test_Toast_Advance_3_SF}, + {"Test_Types_SF", s.Test_Types_SF}, + {"Test_Types_SF_Avro_CDC", s.Test_Types_SF_Avro_CDC}, + {"Test_Multi_Table_SF", s.Test_Multi_Table_SF}, + {"Test_Simple_Schema_Changes_SF", s.Test_Simple_Schema_Changes_SF}, + {"Test_Composite_PKey_SF", s.Test_Composite_PKey_SF}, + {"Test_Composite_PKey_Toast_1_SF", s.Test_Composite_PKey_Toast_1_SF}, + {"Test_Composite_PKey_Toast_2_SF", s.Test_Composite_PKey_Toast_2_SF}, + } + + // assert that there are no duplicate test names + testNames := make(map[string]bool) + for _, tt := range tests { + if testNames[tt.name] { + t.Fatalf("duplicate test name: %s", tt.name) + } + testNames[tt.name] = true + + t.Run(tt.name, tt.test) + } } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tableName) + return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) } func (s *PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, snowflakeSuffix) + return fmt.Sprintf("%s_%s", input, s.pgSuffix) } // setupSnowflake sets up the snowflake connection. @@ -53,6 +89,14 @@ func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { return nil } +func (s *PeerFlowE2ETestSuiteSF) setupTemporalLogger() { + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tlogger := e2e.NewTLogrusLogger(logger) + s.SetLogger(tlogger) +} + func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { err := godotenv.Load() if err != nil { @@ -62,8 +106,15 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { } log.SetReportCaller(true) + log.SetLevel(log.WarnLevel) + + s.setupTemporalLogger() + + suffix := util.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + s.pgSuffix = fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - pool, err := e2e.SetupPostgres(snowflakeSuffix) + pool, err := e2e.SetupPostgres(s.pgSuffix) if err != nil { s.Fail("failed to setup postgres", err) } @@ -76,12 +127,12 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), s.sfHelper.Config) - s.NoError(err) + require.NoError(s.T(), err) } // Implement TearDownAllSuite interface to tear down the test suite func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, snowflakeSuffix) + err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { s.Fail("failed to drop Postgres schema", err) } @@ -94,10 +145,12 @@ func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { } err = s.connector.Close() - s.NoError(err) + require.NoError(s.T(), err) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -105,13 +158,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) + connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +174,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -138,7 +192,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -154,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.Contains(err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) + require.NoError(t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table @@ -163,21 +217,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_flow_sf_avro_cdc") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf_avro_cdc") + tblConst := "test_simple_flow_sf_avro_cdc" + srcTableName := s.attachSchemaSuffix(tblConst) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblConst) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_avro"), @@ -188,7 +245,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -200,30 +257,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := 0; i < 15; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(t, err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 15 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error s.Error(err) s.Contains(err.Error(), "continue as new") - count, err := s.sfHelper.CountRows("test_simple_flow_sf_avro_cdc") - s.NoError(err) - s.Equal(10, count) + count, err := s.sfHelper.CountRows(tblConst) + require.NoError(t, err) + s.Equal(15, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -231,7 +288,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -239,13 +298,13 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, line GEOMETRY(LINESTRING) NOT NULL, poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -256,7 +315,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -277,7 +336,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -287,7 +346,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -305,11 +364,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) + require.NoError(t, err) s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) + require.NoError(t, err) s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table @@ -318,7 +377,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -326,17 +387,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -346,7 +404,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -371,7 +429,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -389,7 +447,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -397,17 +457,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( - id SERIAL PRIMARY KEY, - t1 text, - t2 text, - k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + t1 text, + t2 text, + k int + ); +`, srcTableName, srcTableName)) + log.Infof("Creating table '%s', err: %v", srcTableName, err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -417,7 +476,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -435,7 +494,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -453,7 +512,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -461,17 +522,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -481,7 +540,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -512,7 +571,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -530,7 +589,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -538,16 +599,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -557,7 +616,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -582,7 +641,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -600,7 +659,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -608,17 +669,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, t1 text, t2 text, k int - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; - `, srcTableName)) - s.NoError(err) + ); +`, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -628,7 +687,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -652,7 +711,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -670,7 +729,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -678,23 +739,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + SELECT pg_advisory_lock(hashtext('%s')); + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; - `, srcTableName)) - s.NoError(err) + `, srcTableName, srcTableName)) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -704,7 +758,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -730,8 +784,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) - fmt.Println("Executed an insert with all types") + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -757,7 +810,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -765,23 +820,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, + CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); - CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) - RETURNS bytea AS $body$ - SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') - FROM generate_series(1, $1); - $body$ - LANGUAGE 'sql' - VOLATILE - SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -792,7 +839,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -818,8 +865,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) - fmt.Println("Executed an insert with all types") + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -845,7 +891,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -855,10 +903,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s (id serial primary key, c1 int, c2 text); - CREATE TABLE %s (id serial primary key, c1 int, c2 text); + CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); + CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -868,7 +916,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -884,8 +932,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) - fmt.Println("Executed an insert with all types") + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -895,9 +942,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + require.NoError(t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + require.NoError(t, err) s.Equal(1, count1) s.Equal(1, count2) @@ -905,7 +952,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -918,7 +967,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -928,7 +977,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -942,7 +991,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -958,18 +1007,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,18 +1035,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1015,18 +1064,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1044,7 +1093,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() @@ -1062,7 +1111,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1078,7 +1129,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1088,7 +1139,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1105,7 +1156,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1115,9 +1166,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1136,7 +1187,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1151,12 +1204,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1166,7 +1216,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1178,7 +1228,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1186,18 +1236,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1216,7 +1266,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.AssertExpectations(s.T()) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { + t.Parallel() + env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1231,12 +1283,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ - SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', - round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); - $$ language sql; + ); `, srcTableName)) - s.NoError(err) + require.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1246,7 +1295,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1264,16 +1313,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..82901beac2 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,9 +12,9 @@ import ( ) func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, snowflakeSuffix, tableName) + err := e2e.CreateSourceTableQRep(s.pool, s.pgSuffix, tableName) s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, snowflakeSuffix, tableName, rowCount) + err = e2e.PopulateSourceTable(s.pool, s.pgSuffix, tableName, rowCount) s.NoError(err) } @@ -35,12 +35,12 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, snowflakeSuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, s.pgSuffix, tableName), ) require.NoError(s.T(), err) // read rows from destination table - qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) + qualifiedTableName := fmt.Sprintf("%s.%s.%s", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) var sfSelQuery string if caseSensitive { sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName) @@ -68,11 +68,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -109,11 +109,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -154,7 +154,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", @@ -194,11 +194,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf_xmin", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -239,7 +239,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + s.pgSuffix, tblName) sfPeer := s.sfHelper.Peer sfPeer.GetSnowflakeConfig().S3Integration = "peerdb_s3_integration" diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index aad017c01a..9c2b27bcb0 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -19,7 +19,7 @@ import ( "go.temporal.io/sdk/testsuite" ) -const sqlserverSuffix = "s3" +const sqlserverSuffix = "sqlserver" type PeerFlowE2ETestSuiteSQLServer struct { suite.Suite diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4f53e9a6a4..f26afc2ee1 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -72,7 +72,6 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, } if state.SetupComplete { - fmt.Println("query indicates setup is complete") break } } else { @@ -233,7 +232,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 %s ) VALUES %s; `, suffix, tableName, geoColumns, strings.Join(rows, ","))) @@ -357,3 +356,49 @@ func GetOwnersSelectorString() string { } return strings.Join(fields, ",") } + +// implement temporal logger interface with logrus +// +// type Logger interface { +// Debug(msg string, keyvals ...interface{}) +// Info(msg string, keyvals ...interface{}) +// Warn(msg string, keyvals ...interface{}) +// Error(msg string, keyvals ...interface{}) +// } +type TLogrusLogger struct { + logger *log.Logger +} + +func NewTLogrusLogger(logger *log.Logger) *TLogrusLogger { + return &TLogrusLogger{logger: logger} +} + +func (l *TLogrusLogger) keyvalsToFields(keyvals []interface{}) log.Fields { + fields := make(log.Fields) + for i := 0; i < len(keyvals); i += 2 { + key := fmt.Sprintf("%v", keyvals[i]) + if i+1 < len(keyvals) { + fields[key] = keyvals[i+1] + } else { + // Handle the case where there is no value for the key + fields[key] = nil // or some default value + } + } + return fields +} + +func (l *TLogrusLogger) Debug(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Debug(msg) +} + +func (l *TLogrusLogger) Info(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Info(msg) +} + +func (l *TLogrusLogger) Warn(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Warn(msg) +} + +func (l *TLogrusLogger) Error(msg string, keyvals ...interface{}) { + l.logger.WithFields(l.keyvalsToFields(keyvals)).Error(msg) +}