From 2002605c8a39f65054d8609cbc228ac0ea5d1942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 6 Mar 2024 01:13:36 +0000 Subject: [PATCH] POC: generic Test_Simple_Flow --- .../postgres/postgres_schema_delta_test.go | 40 +++---- flow/e2e/bigquery/bigquery.go | 98 ++++++++++++++++ flow/e2e/bigquery/peer_flow_bq_test.go | 101 +---------------- flow/e2e/generic/peer_flow_test.go | 89 +++++++++++++++ flow/e2e/postgres/postgres.go | 75 +++++++++++++ flow/e2e/postgres/qrep_flow_pg_test.go | 51 +-------- flow/e2e/s3/qrep_flow_s3_test.go | 30 ++--- flow/e2e/snowflake/peer_flow_sf_test.go | 100 +---------------- flow/e2e/snowflake/snowflake.go | 106 ++++++++++++++++++ .../snowflake/snowflake_schema_delta_test.go | 10 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 24 ++-- flow/e2e/test_utils.go | 16 +++ flow/e2eshared/e2eshared.go | 8 +- 13 files changed, 442 insertions(+), 306 deletions(-) create mode 100644 flow/e2e/bigquery/bigquery.go create mode 100644 flow/e2e/generic/peer_flow_test.go create mode 100644 flow/e2e/postgres/postgres.go create mode 100644 flow/e2e/snowflake/snowflake.go diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index c059c36836..e5603b02d9 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) { - teardownTx, err := s.connector.conn.Begin(context.Background()) - require.NoError(s.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(s.t, err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - s.schema)) - require.NoError(s.t, err) - err = teardownTx.Commit(context.Background()) - require.NoError(s.t, err) - - require.NoError(s.t, s.connector.ConnectionActive(context.Background())) - require.NoError(s.t, s.connector.Close()) - require.Error(s.t, s.connector.ConnectionActive(context.Background())) - }) + e2eshared.RunSuite(t, SetupSuite) +} + +func (s PostgresSchemaDeltaTestSuite) Teardown() { + teardownTx, err := s.connector.conn.Begin(context.Background()) + require.NoError(s.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(s.t, err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + s.schema)) + require.NoError(s.t, err) + err = teardownTx.Commit(context.Background()) + require.NoError(s.t, err) + + require.NoError(s.t, s.connector.ConnectionActive(context.Background())) + require.NoError(s.t, s.connector.Close()) + require.Error(s.t, s.connector.ConnectionActive(context.Background())) } diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go new file mode 100644 index 0000000000..a2db8db8f5 --- /dev/null +++ b/flow/e2e/bigquery/bigquery.go @@ -0,0 +1,98 @@ +package e2e_bigquery + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteBQ struct { + t *testing.T + + bqSuffix string + conn *connpostgres.PostgresConnector + bqHelper *BigQueryTestHelper +} + +func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteBQ) Suffix() string { + return s.bqSuffix +} + +func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer { + return s.bqHelper.Peer +} + +func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) Teardown() { + e2e.TearDownPostgres(s) + + err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) + if err != nil { + s.t.Fatalf("failed to tear down bigquery: %v", err) + } +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + conn, err := e2e.SetupPostgres(t, bqSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup postgres: %v", err) + } + + bqHelper, err := NewBigQueryTestHelper() + if err != nil { + t.Fatalf("Failed to create helper: %v", err) + } + + err = bqHelper.RecreateDataset() + if err != nil { + t.Fatalf("Failed to recreate dataset: %v", err) + } + + return PeerFlowE2ETestSuiteBQ{ + t: t, + bqSuffix: bqSuffix, + conn: conn, + bqHelper: bqHelper, + } +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 14e95deda2..6b02f8faf8 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -4,76 +4,23 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteBQ struct { - t *testing.T - - bqSuffix string - conn *connpostgres.PostgresConnector - bqHelper *BigQueryTestHelper -} - -func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteBQ) Suffix() string { - return s.bqSuffix -} - -func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - -func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) { - e2e.TearDownPostgres(s) - - err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) - if err != nil { - s.t.Fatalf("failed to tear down bigquery: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error { @@ -147,52 +94,6 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel return nil } -// setupBigQuery sets up the bigquery connection. -func setupBigQuery(t *testing.T) *BigQueryTestHelper { - t.Helper() - - bqHelper, err := NewBigQueryTestHelper() - if err != nil { - t.Fatalf("Failed to create helper: %v", err) - } - - err = bqHelper.RecreateDataset() - if err != nil { - t.Fatalf("Failed to recreate dataset: %v", err) - } - - return bqHelper -} - -// Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - conn, err := e2e.SetupPostgres(t, bqSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup postgres: %v", err) - } - - bq := setupBigQuery(t) - - return PeerFlowE2ETestSuiteBQ{ - t: t, - bqSuffix: bqSuffix, - conn: conn, - bqHelper: bq, - } -} - func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go new file mode 100644 index 0000000000..9ccda8484f --- /dev/null +++ b/flow/e2e/generic/peer_flow_test.go @@ -0,0 +1,89 @@ +package e2e_generic + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/bigquery" + "github.com/PeerDB-io/peer-flow/e2e/postgres" + "github.com/PeerDB-io/peer-flow/e2e/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +type GenericSuite interface { + e2e.RowSource + Peer() *protos.Peer +} + +func TestGenericPG(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) +} + +func TestGenericSF(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) +} + +func TestGenericBQ(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) +} + +type GenericWrapper struct { + GenericSuite +} + +func SetupGenericSuite[T GenericSuite](f func(t *testing.T) T) func(t *testing.T) GenericWrapper { + return func(t *testing.T) GenericWrapper { + t.Helper() + return GenericWrapper{f(t)} + } +} + +func (s GenericWrapper) Test_Simple_Flow() { + t := s.T() + srcTableName := e2e.AttachSchema(s, "test_simple_flow") + dstTableName := e2e.AttachSchema(s, "test_simple_flow_dst") + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + myh HSTORE NOT NULL + ); + `, srcTableName)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_simple_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer(), + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig.MaxBatchSize = 100 + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') + `, srcTableName), testKey, testValue) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTables(env, s, "normalizing 10 rows", "id,key,value,myh", `id,key,value,myh`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +} diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go new file mode 100644 index 0000000000..1ce3452c98 --- /dev/null +++ b/flow/e2e/postgres/postgres.go @@ -0,0 +1,75 @@ +package e2e_postgres + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuitePG struct { + t *testing.T + + conn *connpostgres.PostgresConnector + peer *protos.Peer + suffix string +} + +func (s PeerFlowE2ETestSuitePG) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuitePG) Suffix() string { + return s.suffix +} + +func (s PeerFlowE2ETestSuitePG) Peer() *protos.Peer { + return s.peer +} + +func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) { + s.t.Helper() + pgQueryExecutor := s.conn.NewQRepQueryExecutor("testflow", "testpart") + pgQueryExecutor.SetTestEnv(true) + + return pgQueryExecutor.ExecuteAndProcessQuery( + context.Background(), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, s.suffix, connpostgres.QuoteIdentifier(table)), + ) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { + t.Helper() + + suffix := "pg_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + + return PeerFlowE2ETestSuitePG{ + t: t, + conn: conn, + peer: e2e.GeneratePostgresPeer(), + suffix: suffix, + } +} + +func (s PeerFlowE2ETestSuitePG) Teardown() { + e2e.TearDownPostgres(s) +} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index fb49d3d242..abb7867d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -10,7 +10,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -21,56 +20,8 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -type PeerFlowE2ETestSuitePG struct { - t *testing.T - - conn *connpostgres.PostgresConnector - peer *protos.Peer - suffix string -} - -func (s PeerFlowE2ETestSuitePG) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuitePG) Suffix() string { - return s.suffix -} - func TestPeerFlowE2ETestSuitePG(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) { - e2e.TearDownPostgres(s) - }) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := "pg_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(t, suffix) - require.NoError(t, err, "failed to setup postgres") - - return PeerFlowE2ETestSuitePG{ - t: t, - conn: conn, - peer: e2e.GeneratePostgresPeer(), - suffix: suffix, - } + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 240f3a78b9..d59378d52c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -36,21 +35,12 @@ func (s PeerFlowE2ETestSuiteS3) Suffix() string { return s.suffix } -func tearDownSuite(s PeerFlowE2ETestSuiteS3) { - e2e.TearDownPostgres(s) - - err := s.s3Helper.CleanUp(context.Background()) - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } -} - func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteS3, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteS3) } func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteGCS, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteGCS) } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { @@ -63,13 +53,6 @@ func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "s3_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil || conn == nil { @@ -89,6 +72,15 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { } } +func (s PeerFlowE2ETestSuiteS3) Teardown() { + e2e.TearDownPostgres(s) + + err := s.s3Helper.CleanUp(context.Background()) + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } +} + func SetupSuiteS3(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() return setupSuite(t, false) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9eaf491e47..dd00a705de 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,121 +9,27 @@ import ( "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteSF struct { - t *testing.T - - pgSuffix string - conn *connpostgres.PostgresConnector - sfHelper *SnowflakeTestHelper - connector *connsnowflake.SnowflakeConnector -} - -func (s PeerFlowE2ETestSuiteSF) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { - return s.Connector().Conn() -} - -func (s PeerFlowE2ETestSuiteSF) Suffix() string { - return s.pgSuffix -} - -func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) - s.t.Logf("running query on snowflake: %s", sfSelQuery) - return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) -} - func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - e2e.TearDownPostgres(s) - - if s.sfHelper != nil { - err := s.sfHelper.Cleanup() - if err != nil { - s.t.Fatalf("failed to tear down Snowflake: %v", err) - } - } - - err := s.connector.Close() - if err != nil { - s.t.Fatalf("failed to close Snowflake connector: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) + return e2e.AttachSchema(s, tableName) } func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.pgSuffix) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - - conn, err := e2e.SetupPostgres(t, pgSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup Postgres: %v", err) - } - - sfHelper, err := NewSnowflakeTestHelper() - if err != nil { - t.Fatalf("failed to setup Snowflake: %v", err) - } - - connector, err := connsnowflake.NewSnowflakeConnector( - context.Background(), - sfHelper.Config, - ) - require.NoError(t, err) - - suite := PeerFlowE2ETestSuiteSF{ - t: t, - pgSuffix: pgSuffix, - conn: conn, - sfHelper: sfHelper, - connector: connector, - } - - return suite + return e2e.AddSuffix(s, input) } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go new file mode 100644 index 0000000000..78787c41d5 --- /dev/null +++ b/flow/e2e/snowflake/snowflake.go @@ -0,0 +1,106 @@ +package e2e_snowflake + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteSF struct { + t *testing.T + + pgSuffix string + conn *connpostgres.PostgresConnector + sfHelper *SnowflakeTestHelper + connector *connsnowflake.SnowflakeConnector +} + +func (s PeerFlowE2ETestSuiteSF) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { + return s.Connector().Conn() +} + +func (s PeerFlowE2ETestSuiteSF) Suffix() string { + return s.pgSuffix +} + +func (s PeerFlowE2ETestSuiteSF) Peer() *protos.Peer { + return s.sfHelper.Peer +} + +func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + s.t.Logf("running query on snowflake: %s", sfSelQuery) + return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) + + conn, err := e2e.SetupPostgres(t, pgSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup Postgres: %v", err) + } + + sfHelper, err := NewSnowflakeTestHelper() + if err != nil { + t.Fatalf("failed to setup Snowflake: %v", err) + } + + connector, err := connsnowflake.NewSnowflakeConnector( + context.Background(), + sfHelper.Config, + ) + require.NoError(t, err) + + suite := PeerFlowE2ETestSuiteSF{ + t: t, + pgSuffix: pgSuffix, + conn: conn, + sfHelper: sfHelper, + connector: connector, + } + + return suite +} + +func (s PeerFlowE2ETestSuiteSF) Teardown() { + e2e.TearDownPostgres(s) + + if s.sfHelper != nil { + err := s.sfHelper.Cleanup() + if err != nil { + s.t.Fatalf("failed to tear down Snowflake: %v", err) + } + } + + err := s.connector.Close() + if err != nil { + s.t.Fatalf("failed to close Snowflake connector: %v", err) + } +} diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 68c70e56aa..d607e90451 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -304,9 +304,11 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } +func (s SnowflakeSchemaDeltaTestSuite) Teardown() { + require.NoError(s.t, s.sfTestHelper.Cleanup()) + require.NoError(s.t, s.connector.Close()) +} + func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) { - require.NoError(s.t, s.sfTestHelper.Cleanup()) - require.NoError(s.t, s.connector.Close()) - }) + e2eshared.RunSuite(t, setupSchemaDeltaSuite) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 448d0f32b7..153fa07cc6 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -48,26 +47,21 @@ func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string { } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSQLServer) { - e2e.TearDownPostgres(s) + e2eshared.RunSuite(t, SetupSuite) +} - if s.sqlsHelper != nil { - err := s.sqlsHelper.CleanUp() - require.NoError(s.t, err) - } - }) +func (s PeerFlowE2ETestSuiteSQLServer) Teardown() { + e2e.TearDownPostgres(s) + + if s.sqlsHelper != nil { + err := s.sqlsHelper.CleanUp() + require.NoError(s.t, err) + } } func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "sqls_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 8592eb406e..b4e15f1cfb 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/joho/godotenv" "github.com/stretchr/testify/require" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" @@ -33,7 +34,14 @@ import ( peerflow "github.com/PeerDB-io/peer-flow/workflows" ) +func init() { + // it's okay if the .env file is not present + // we will use the default values + _ = godotenv.Load() +} + type Suite interface { + e2eshared.Suite T() *testing.T Connector() *connpostgres.PostgresConnector Suffix() string @@ -44,6 +52,14 @@ type RowSource interface { GetRows(table, cols string) (*model.QRecordBatch, error) } +func AttachSchema(s Suite, table string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.Suffix(), table) +} + +func AddSuffix(s Suite, str string) string { + return fmt.Sprintf("%s_%s", str, s.Suffix()) +} + // Helper function to assert errors in go routines running concurrent to workflows // This achieves two goals: // 1. cancel workflow to avoid waiting on goroutine which has failed diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index b242ebb1ea..087ff58014 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -12,7 +12,11 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { +type Suite interface { + Teardown() +} + +func RunSuite[T Suite](t *testing.T, setup func(t *testing.T) T) { t.Helper() t.Parallel() @@ -26,7 +30,7 @@ func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) subtest.Parallel() suite := setup(subtest) subtest.Cleanup(func() { - teardown(suite) + suite.Teardown() }) m.Func.Call([]reflect.Value{reflect.ValueOf(suite)}) })