From 970656de9936f8ee093c3e6027c54b1dee335a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 12 Mar 2024 17:11:49 +0000 Subject: [PATCH] Move PG/SF simple schema changes to generic test BQ would need to implement GetTableSchema interface --- flow/connectors/core.go | 10 +- flow/e2e/bigquery/bigquery.go | 6 + flow/e2e/bigquery/peer_flow_bq_test.go | 3 +- flow/e2e/generic/generic_test.go | 304 +++++++++++++++++++++++++ flow/e2e/generic/peer_flow_test.go | 82 ------- flow/e2e/postgres/postgres.go | 5 + flow/e2e/snowflake/snowflake.go | 9 + flow/e2e/test_utils.go | 15 ++ 8 files changed, 350 insertions(+), 84 deletions(-) create mode 100644 flow/e2e/generic/generic_test.go delete mode 100644 flow/e2e/generic/peer_flow_test.go diff --git a/flow/connectors/core.go b/flow/connectors/core.go index b374ff70e4..ed7b3cb223 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -27,11 +27,16 @@ type Connector interface { ConnectionActive(context.Context) error } -type CDCPullConnector interface { +type GetTableSchemaConnector interface { Connector // GetTableSchema returns the schema of a table. GetTableSchema(ctx context.Context, req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) +} + +type CDCPullConnector interface { + Connector + GetTableSchemaConnector // EnsurePullability ensures that the connector is pullable. EnsurePullability(ctx context.Context, req *protos.EnsurePullabilityBatchInput) ( @@ -252,6 +257,9 @@ var ( _ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{} _ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{} + _ GetTableSchemaConnector = &connpostgres.PostgresConnector{} + _ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{} + _ NormalizedTablesConnector = &connpostgres.PostgresConnector{} _ NormalizedTablesConnector = &connbigquery.BigQueryConnector{} _ NormalizedTablesConnector = &connsnowflake.SnowflakeConnector{} diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go index 73f5c38d6e..1e2a3842ee 100644 --- a/flow/e2e/bigquery/bigquery.go +++ b/flow/e2e/bigquery/bigquery.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -35,6 +36,11 @@ func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector { + // TODO have BQ connector + return nil +} + func (s PeerFlowE2ETestSuiteBQ) Suffix() string { return s.bqSuffix } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index ec28b5f97b..3dff6e310e 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -643,7 +643,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { e2e.RequireEnvCanceled(s.t, env) } -// TODO: not checking schema exactly, add later +// TODO: not checking schema exactly +// write a GetTableSchemaConnector for BQ to enable generic_test func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/generic/generic_test.go b/flow/e2e/generic/generic_test.go new file mode 100644 index 0000000000..c4500ef505 --- /dev/null +++ b/flow/e2e/generic/generic_test.go @@ -0,0 +1,304 @@ +package e2e_generic + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/connectors" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/bigquery" + "github.com/PeerDB-io/peer-flow/e2e/postgres" + "github.com/PeerDB-io/peer-flow/e2e/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +func TestGenericPG(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) +} + +func TestGenericSF(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) +} + +func TestGenericBQ(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) +} + +type Generic struct { + e2e.GenericSuite +} + +func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { + return func(t *testing.T) Generic { + t.Helper() + return Generic{f(t)} + } +} + +func (s Generic) Test_Simple_Flow() { + t := s.T() + srcTable := "test_simple" + dstTable := "test_simple_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + myh HSTORE NOT NULL + ); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_simple"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') + `, srcSchemaTable), testKey, testValue) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +} + +func (s Generic) Test_Simple_Schema_Changes() { + t := s.T() + + destinationSchemaConnector, ok := s.DestinationConnector().(connectors.GetTableSchemaConnector) + if !ok { + t.SkipNow() + } + + srcTable := "test_simple_schema_changes" + dstTable := "test_simple_schema_changes_dst" + srcTableName := e2e.AttachSchema(s, srcTable) + dstTableName := s.DestinationTable(dstTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 BIGINT + ); + `, srcTableName)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, srcTable), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + // wait for PeerFlowStatusQuery to finish setup + // and then insert and mutate schema repeatedly. + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) + e2e.EnvNoError(t, env, err) + t.Log("Inserted initial row in the source table") + + e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1") + + expectedTableSchema := &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_IS_DELETED", + Type: string(qvalue.QValueKindBoolean), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + }, + } + output, err := destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + + // alter source table, add column c2 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, added column c2") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row with added c2 in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") + + // alter source table, add column c3, drop column c2 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, dropped column c2 and added column c3") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row with added c3 in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c3"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") + + // alter source table, drop column c3 and insert another row. + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + ALTER TABLE %s DROP COLUMN c3`, srcTableName)) + e2e.EnvNoError(t, env, err) + t.Log("Altered source table, dropped column c3") + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) + e2e.EnvNoError(t, env, err) + t.Log("Inserted row after dropping all columns in the source table") + + // verify we got our two rows, if schema did not match up it will error. + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1") + expectedTableSchema = &protos.TableSchema{ + TableIdentifier: e2e.ExpectedDestinationTableName(s, dstTable), + Columns: []*protos.FieldDescription{ + { + Name: e2e.ExpectedDestinationIdentifier(s, "id"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c1"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: "_PEERDB_SYNCED_AT", + Type: string(qvalue.QValueKindTimestamp), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c2"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + { + Name: e2e.ExpectedDestinationIdentifier(s, "c3"), + Type: string(qvalue.QValueKindNumeric), + TypeModifier: -1, + }, + }, + } + output, err = destinationSchemaConnector.GetTableSchema(context.Background(), &protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + e2e.EnvNoError(t, env, err) + e2e.EnvTrue(t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + + env.Cancel() + + e2e.RequireEnvCanceled(t, env) +} diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go deleted file mode 100644 index 20c5847df4..0000000000 --- a/flow/e2e/generic/peer_flow_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package e2e_generic - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2e/bigquery" - "github.com/PeerDB-io/peer-flow/e2e/postgres" - "github.com/PeerDB-io/peer-flow/e2e/snowflake" - "github.com/PeerDB-io/peer-flow/e2eshared" - peerflow "github.com/PeerDB-io/peer-flow/workflows" -) - -func TestGenericPG(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) -} - -func TestGenericSF(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) -} - -func TestGenericBQ(t *testing.T) { - e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) -} - -type Generic struct { - e2e.GenericSuite -} - -func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { - return func(t *testing.T) Generic { - t.Helper() - return Generic{f(t)} - } -} - -func (s Generic) Test_Simple_Flow() { - t := s.T() - srcTable := "test_simple" - dstTable := "test_simple_dst" - srcSchemaTable := e2e.AttachSchema(s, srcTable) - - _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL, - myh HSTORE NOT NULL - ); - `, srcSchemaTable)) - require.NoError(t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: e2e.AddSuffix(s, "test_simple"), - TableMappings: e2e.TableMappings(s, srcTable, dstTable), - Destination: s.Peer(), - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - - tc := e2e.NewTemporalClient(t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - - e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') - `, srcSchemaTable), testKey, testValue) - e2e.EnvNoError(t, env, err) - } - t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) - env.Cancel() - e2e.RequireEnvCanceled(t, env) -} diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go index 23ca778c8d..8eafd6ade0 100644 --- a/flow/e2e/postgres/postgres.go +++ b/flow/e2e/postgres/postgres.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -32,6 +33,10 @@ func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector { + return s.conn +} + func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { return s.conn.Conn() } diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go index 45132ef601..b70b4b5bf1 100644 --- a/flow/e2e/snowflake/snowflake.go +++ b/flow/e2e/snowflake/snowflake.go @@ -10,6 +10,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" @@ -35,6 +36,10 @@ func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { return s.conn } +func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector { + return s.connector +} + func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { return s.Connector().Conn() } @@ -51,6 +56,10 @@ func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string { return fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, table) } +func (s PeerFlowE2ETestSuiteSF) DestinationTableName(table string) string { + return strings.ToUpper(s.DestinationTable(table)) +} + func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { s.t.Helper() qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index c018f32df2..7176a577de 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -22,6 +22,7 @@ import ( "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -55,6 +56,7 @@ type RowSource interface { type GenericSuite interface { RowSource Peer() *protos.Peer + DestinationConnector() connectors.Connector DestinationTable(table string) string } @@ -519,6 +521,19 @@ func GetOwnersSelectorStringsSF() [2]string { return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } +func ExpectedDestinationIdentifier(s GenericSuite, ident string) string { + switch s.DestinationConnector().(type) { + case *connsnowflake.SnowflakeConnector: + return strings.ToUpper(ident) + default: + return ident + } +} + +func ExpectedDestinationTableName(s GenericSuite, table string) string { + return ExpectedDestinationIdentifier(s, s.DestinationTable(table)) +} + type testWriter struct { *testing.T }