diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 5f6899b15b..38bc636f51 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -12,7 +12,7 @@ jobs: matrix: runner: [ubicloud-standard-16-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} - timeout-minutes: 40 + timeout-minutes: 30 services: catalog: image: imresamu/postgis:15-3.4-alpine @@ -96,7 +96,7 @@ jobs: temporal operator search-attribute create --name MirrorName --type Text --namespace default ./peer-flow worker & ./peer-flow snapshot-worker & - go test -p 32 ./... -timeout 1200s + go test -p 32 ./... -timeout 900s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index c059c36836..e5603b02d9 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -196,23 +196,25 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PostgresSchemaDeltaTestSuite) { - teardownTx, err := s.connector.conn.Begin(context.Background()) - require.NoError(s.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(s.t, err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - s.schema)) - require.NoError(s.t, err) - err = teardownTx.Commit(context.Background()) - require.NoError(s.t, err) - - require.NoError(s.t, s.connector.ConnectionActive(context.Background())) - require.NoError(s.t, s.connector.Close()) - require.Error(s.t, s.connector.ConnectionActive(context.Background())) - }) + e2eshared.RunSuite(t, SetupSuite) +} + +func (s PostgresSchemaDeltaTestSuite) Teardown() { + teardownTx, err := s.connector.conn.Begin(context.Background()) + require.NoError(s.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(s.t, err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + s.schema)) + require.NoError(s.t, err) + err = teardownTx.Commit(context.Background()) + require.NoError(s.t, err) + + require.NoError(s.t, s.connector.ConnectionActive(context.Background())) + require.NoError(s.t, s.connector.Close()) + require.Error(s.t, s.connector.ConnectionActive(context.Background())) } diff --git a/flow/e2e/bigquery/bigquery.go b/flow/e2e/bigquery/bigquery.go new file mode 100644 index 0000000000..73f5c38d6e --- /dev/null +++ b/flow/e2e/bigquery/bigquery.go @@ -0,0 +1,102 @@ +package e2e_bigquery + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteBQ struct { + t *testing.T + + bqSuffix string + conn *connpostgres.PostgresConnector + bqHelper *BigQueryTestHelper +} + +func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteBQ) Suffix() string { + return s.bqSuffix +} + +func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer { + return s.bqHelper.Peer +} + +func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string { + return table +} + +func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + +func (s PeerFlowE2ETestSuiteBQ) Teardown() { + e2e.TearDownPostgres(s) + + err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) + if err != nil { + s.t.Fatalf("failed to tear down bigquery: %v", err) + } +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + conn, err := e2e.SetupPostgres(t, bqSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup postgres: %v", err) + } + + bqHelper, err := NewBigQueryTestHelper() + if err != nil { + t.Fatalf("Failed to create helper: %v", err) + } + + err = bqHelper.RecreateDataset() + if err != nil { + t.Fatalf("Failed to recreate dataset: %v", err) + } + + return PeerFlowE2ETestSuiteBQ{ + t: t, + bqSuffix: bqSuffix, + conn: conn, + bqHelper: bqHelper, + } +} diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 14e95deda2..ec28b5f97b 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -4,76 +4,23 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteBQ struct { - t *testing.T - - bqSuffix string - conn *connpostgres.PostgresConnector - bqHelper *BigQueryTestHelper -} - -func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteBQ) Suffix() string { - return s.bqSuffix -} - -func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - -func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) -} - func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) { - e2e.TearDownPostgres(s) - - err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) - if err != nil { - s.t.Fatalf("failed to tear down bigquery: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error { @@ -147,52 +94,6 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel return nil } -// setupBigQuery sets up the bigquery connection. -func setupBigQuery(t *testing.T) *BigQueryTestHelper { - t.Helper() - - bqHelper, err := NewBigQueryTestHelper() - if err != nil { - t.Fatalf("Failed to create helper: %v", err) - } - - err = bqHelper.RecreateDataset() - if err != nil { - t.Fatalf("Failed to recreate dataset: %v", err) - } - - return bqHelper -} - -// Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - conn, err := e2e.SetupPostgres(t, bqSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup postgres: %v", err) - } - - bq := setupBigQuery(t) - - return PeerFlowE2ETestSuiteBQ{ - t: t, - bqSuffix: bqSuffix, - conn: conn, - bqHelper: bq, - } -} - func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { tc := e2e.NewTemporalClient(s.t) @@ -268,54 +169,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { e2e.RequireEnvCanceled(s.t, env) } -// Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. -// The test inserts 10 rows into the source table and verifies that the data is -// correctly synced to the destination table after sync flow completes. -func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - tc := e2e.NewTemporalClient(s.t) - - srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") - dstTableName := "test_simple_flow_bq" - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_complete_simple_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.bqHelper.Peer, - CdcStagingPath: "", - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitForEqualTables(env, s, "normalize inserts", dstTableName, "id,key,value") - - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { tc := e2e.NewTemporalClient(s.t) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index c7f6a5c7f8..ae0841f153 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -45,14 +45,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { rows = append(rows, row) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO e2e_test_%s.%s ( - watermark_ts, - mytimestamp, - mytztimestamp, - medieval, - mybaddate, - mydate - ) VALUES %s; + INSERT INTO e2e_test_%s.%s ( + watermark_ts, mytimestamp, mytztimestamp, medieval, mybaddate, mydate + ) VALUES %s; `, s.bqSuffix, tableName, strings.Join(rows, ","))) require.NoError(s.t, err) } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 82387c9ff3..2fd3c180e5 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -162,32 +162,36 @@ func GeneratePostgresPeer() *protos.Peer { type FlowConnectionGenerationConfig struct { FlowJobName string + TableMappings []*protos.TableMapping TableNameMapping map[string]string Destination *protos.Peer CdcStagingPath string SoftDelete bool } -// GenerateSnowflakePeer generates a snowflake peer config for testing. -func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Peer, error) { - ret := &protos.Peer{} - ret.Name = "test_snowflake_peer" - ret.Type = protos.DBType_SNOWFLAKE - - ret.Config = &protos.Peer_SnowflakeConfig{ - SnowflakeConfig: snowflakeConfig, +func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping { + if len(tables)&1 != 0 { + panic("must receive even number of table names") } - - return ret, nil + tm := make([]*protos.TableMapping, 0, len(tables)/2) + for i := 0; i < len(tables); i += 2 { + tm = append(tm, &protos.TableMapping{ + SourceTableIdentifier: AttachSchema(s, tables[i]), + DestinationTableIdentifier: s.DestinationTable(tables[i+1]), + }) + } + return tm } func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs { - tblMappings := []*protos.TableMapping{} - for k, v := range c.TableNameMapping { - tblMappings = append(tblMappings, &protos.TableMapping{ - SourceTableIdentifier: k, - DestinationTableIdentifier: v, - }) + tblMappings := c.TableMappings + if tblMappings == nil { + for k, v := range c.TableNameMapping { + tblMappings = append(tblMappings, &protos.TableMapping{ + SourceTableIdentifier: k, + DestinationTableIdentifier: v, + }) + } } ret := &protos.FlowConnectionConfigs{ diff --git a/flow/e2e/generic/peer_flow_test.go b/flow/e2e/generic/peer_flow_test.go new file mode 100644 index 0000000000..20c5847df4 --- /dev/null +++ b/flow/e2e/generic/peer_flow_test.go @@ -0,0 +1,82 @@ +package e2e_generic + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/bigquery" + "github.com/PeerDB-io/peer-flow/e2e/postgres" + "github.com/PeerDB-io/peer-flow/e2e/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +func TestGenericPG(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_postgres.SetupSuite)) +} + +func TestGenericSF(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_snowflake.SetupSuite)) +} + +func TestGenericBQ(t *testing.T) { + e2eshared.RunSuite(t, SetupGenericSuite(e2e_bigquery.SetupSuite)) +} + +type Generic struct { + e2e.GenericSuite +} + +func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic { + return func(t *testing.T) Generic { + t.Helper() + return Generic{f(t)} + } +} + +func (s Generic) Test_Simple_Flow() { + t := s.T() + srcTable := "test_simple" + dstTable := "test_simple_dst" + srcSchemaTable := e2e.AttachSchema(s, srcTable) + + _, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL, + myh HSTORE NOT NULL + ); + `, srcSchemaTable)) + require.NoError(t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "test_simple"), + TableMappings: e2e.TableMappings(s, srcTable, dstTable), + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + + tc := e2e.NewTemporalClient(t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + + e2e.SetupCDCFlowStatusQuery(t, env, connectionGen) + // insert 10 rows into the source table + for i := range 10 { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') + `, srcSchemaTable), testKey, testValue) + e2e.EnvNoError(t, env, err) + } + t.Log("Inserted 10 rows into the source table") + + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,key,value,myh`) + env.Cancel() + e2e.RequireEnvCanceled(t, env) +} diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index fa55bbb3fc..2e69376b01 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -77,52 +77,6 @@ func (s PeerFlowE2ETestSuitePG) WaitForSchema( }) } -func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - srcTableName := s.attachSchemaSuffix("test_simple_flow") - dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL, - myh HSTORE NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - tc := e2e.NewTemporalClient(s.t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 10 rows into the source table - for i := range 10 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 10 rows into the source table") - - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 10 rows", func() bool { - return s.comparePGTables(srcTableName, dstTableName, "id,key,value") == nil - }) - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { srcTableName := s.attachSchemaSuffix("test_geospatial_pg") dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst") @@ -1097,7 +1051,7 @@ func (s PeerFlowE2ETestSuitePG) Test_ContinueAsNew() { } s.t.Log("Inserted 144 rows into the source table") - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 72 syncs", func() bool { + e2e.EnvWaitFor(s.t, env, 4*time.Minute, "normalize 72 syncs", func() bool { return s.comparePGTables(srcTableName, dstTableName, "id,key,value") == nil }) env.Cancel() diff --git a/flow/e2e/postgres/postgres.go b/flow/e2e/postgres/postgres.go new file mode 100644 index 0000000000..23ca778c8d --- /dev/null +++ b/flow/e2e/postgres/postgres.go @@ -0,0 +1,79 @@ +package e2e_postgres + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuitePG struct { + t *testing.T + + conn *connpostgres.PostgresConnector + peer *protos.Peer + suffix string +} + +func (s PeerFlowE2ETestSuitePG) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { + return s.conn.Conn() +} + +func (s PeerFlowE2ETestSuitePG) Suffix() string { + return s.suffix +} + +func (s PeerFlowE2ETestSuitePG) Peer() *protos.Peer { + return s.peer +} + +func (s PeerFlowE2ETestSuitePG) DestinationTable(table string) string { + return e2e.AttachSchema(s, table) +} + +func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) { + s.t.Helper() + pgQueryExecutor := s.conn.NewQRepQueryExecutor("testflow", "testpart") + pgQueryExecutor.SetTestEnv(true) + + return pgQueryExecutor.ExecuteAndProcessQuery( + context.Background(), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s.%s ORDER BY id`, cols, s.suffix, connpostgres.QuoteIdentifier(table)), + ) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { + t.Helper() + + suffix := "pg_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + + return PeerFlowE2ETestSuitePG{ + t: t, + conn: conn, + peer: e2e.GeneratePostgresPeer(), + suffix: suffix, + } +} + +func (s PeerFlowE2ETestSuitePG) Teardown() { + e2e.TearDownPostgres(s) +} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index fb49d3d242..abb7867d24 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -10,7 +10,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -21,56 +20,8 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -type PeerFlowE2ETestSuitePG struct { - t *testing.T - - conn *connpostgres.PostgresConnector - peer *protos.Peer - suffix string -} - -func (s PeerFlowE2ETestSuitePG) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn { - return s.conn.Conn() -} - -func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuitePG) Suffix() string { - return s.suffix -} - func TestPeerFlowE2ETestSuitePG(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) { - e2e.TearDownPostgres(s) - }) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := "pg_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(t, suffix) - require.NoError(t, err, "failed to setup postgres") - - return PeerFlowE2ETestSuitePG{ - t: t, - conn: conn, - peer: e2e.GeneratePostgresPeer(), - suffix: suffix, - } + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 240f3a78b9..d59378d52c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -36,21 +35,12 @@ func (s PeerFlowE2ETestSuiteS3) Suffix() string { return s.suffix } -func tearDownSuite(s PeerFlowE2ETestSuiteS3) { - e2e.TearDownPostgres(s) - - err := s.s3Helper.CleanUp(context.Background()) - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } -} - func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteS3, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteS3) } func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { - e2eshared.RunSuite(t, SetupSuiteGCS, tearDownSuite) + e2eshared.RunSuite(t, SetupSuiteGCS) } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { @@ -63,13 +53,6 @@ func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "s3_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil || conn == nil { @@ -89,6 +72,15 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { } } +func (s PeerFlowE2ETestSuiteS3) Teardown() { + e2e.TearDownPostgres(s) + + err := s.s3Helper.CleanUp(context.Background()) + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } +} + func SetupSuiteS3(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() return setupSuite(t, false) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9eaf491e47..525d2c7256 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,176 +9,27 @@ import ( "time" "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" - connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type PeerFlowE2ETestSuiteSF struct { - t *testing.T - - pgSuffix string - conn *connpostgres.PostgresConnector - sfHelper *SnowflakeTestHelper - connector *connsnowflake.SnowflakeConnector -} - -func (s PeerFlowE2ETestSuiteSF) T() *testing.T { - return s.t -} - -func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { - return s.conn -} - -func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { - return s.Connector().Conn() -} - -func (s PeerFlowE2ETestSuiteSF) Suffix() string { - return s.pgSuffix -} - -func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { - s.t.Helper() - qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) - s.t.Logf("running query on snowflake: %s", sfSelQuery) - return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) -} - func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - e2e.TearDownPostgres(s) - - if s.sfHelper != nil { - err := s.sfHelper.Cleanup() - if err != nil { - s.t.Fatalf("failed to tear down Snowflake: %v", err) - } - } - - err := s.connector.Close() - if err != nil { - s.t.Fatalf("failed to close Snowflake connector: %v", err) - } - }) + e2eshared.RunSuite(t, SetupSuite) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) + return e2e.AttachSchema(s, tableName) } func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.pgSuffix) -} - -func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { - t.Helper() - - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - - suffix := shared.RandomString(8) - tsSuffix := time.Now().Format("20060102150405") - pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - - conn, err := e2e.SetupPostgres(t, pgSuffix) - if err != nil || conn == nil { - t.Fatalf("failed to setup Postgres: %v", err) - } - - sfHelper, err := NewSnowflakeTestHelper() - if err != nil { - t.Fatalf("failed to setup Snowflake: %v", err) - } - - connector, err := connsnowflake.NewSnowflakeConnector( - context.Background(), - sfHelper.Config, - ) - require.NoError(t, err) - - suite := PeerFlowE2ETestSuiteSF{ - t: t, - pgSuffix: pgSuffix, - conn: conn, - sfHelper: sfHelper, - connector: connector, - } - - return suite -} - -func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - tc := e2e.NewTemporalClient(s.t) - - tableName := "test_simple_flow_sf" - srcTableName := s.attachSchemaSuffix(tableName) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix(tableName), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.sfHelper.Peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 - - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - // wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 20 rows into the source table - for i := range 20 { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 20 rows into the source table") - e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value") - - env.Cancel() - - e2e.RequireEnvCanceled(s.t, env) - - // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago - // it should match the count. - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' - `, dstTableName) - numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - require.NoError(s.t, err) - require.Equal(s.t, 20, numNewRows) + return e2e.AddSuffix(s, input) } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { diff --git a/flow/e2e/snowflake/snowflake.go b/flow/e2e/snowflake/snowflake.go new file mode 100644 index 0000000000..45132ef601 --- /dev/null +++ b/flow/e2e/snowflake/snowflake.go @@ -0,0 +1,110 @@ +package e2e_snowflake + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" +) + +type PeerFlowE2ETestSuiteSF struct { + t *testing.T + + pgSuffix string + conn *connpostgres.PostgresConnector + sfHelper *SnowflakeTestHelper + connector *connsnowflake.SnowflakeConnector +} + +func (s PeerFlowE2ETestSuiteSF) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn { + return s.Connector().Conn() +} + +func (s PeerFlowE2ETestSuiteSF) Suffix() string { + return s.pgSuffix +} + +func (s PeerFlowE2ETestSuiteSF) Peer() *protos.Peer { + return s.sfHelper.Peer +} + +func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string { + return fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, table) +} + +func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + s.t.Logf("running query on snowflake: %s", sfSelQuery) + return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) +} + +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { + t.Helper() + + suffix := shared.RandomString(8) + tsSuffix := time.Now().Format("20060102150405") + pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) + + conn, err := e2e.SetupPostgres(t, pgSuffix) + if err != nil || conn == nil { + t.Fatalf("failed to setup Postgres: %v", err) + } + + sfHelper, err := NewSnowflakeTestHelper() + if err != nil { + t.Fatalf("failed to setup Snowflake: %v", err) + } + + connector, err := connsnowflake.NewSnowflakeConnector( + context.Background(), + sfHelper.Config, + ) + require.NoError(t, err) + + suite := PeerFlowE2ETestSuiteSF{ + t: t, + pgSuffix: pgSuffix, + conn: conn, + sfHelper: sfHelper, + connector: connector, + } + + return suite +} + +func (s PeerFlowE2ETestSuiteSF) Teardown() { + e2e.TearDownPostgres(s) + + if s.sfHelper != nil { + err := s.sfHelper.Cleanup() + if err != nil { + s.t.Fatalf("failed to tear down Snowflake: %v", err) + } + } + + err := s.connector.Close() + if err != nil { + s.t.Fatalf("failed to close Snowflake connector: %v", err) + } +} diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 68c70e56aa..d607e90451 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -304,9 +304,11 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } +func (s SnowflakeSchemaDeltaTestSuite) Teardown() { + require.NoError(s.t, s.sfTestHelper.Cleanup()) + require.NoError(s.t, s.connector.Close()) +} + func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - e2eshared.RunSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) { - require.NoError(s.t, s.sfTestHelper.Cleanup()) - require.NoError(s.t, s.connector.Close()) - }) + e2eshared.RunSuite(t, setupSchemaDeltaSuite) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 448d0f32b7..153fa07cc6 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/joho/godotenv" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -48,26 +47,21 @@ func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string { } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSQLServer) { - e2e.TearDownPostgres(s) + e2eshared.RunSuite(t, SetupSuite) +} - if s.sqlsHelper != nil { - err := s.sqlsHelper.CleanUp() - require.NoError(s.t, err) - } - }) +func (s PeerFlowE2ETestSuiteSQLServer) Teardown() { + e2e.TearDownPostgres(s) + + if s.sqlsHelper != nil { + err := s.sqlsHelper.CleanUp() + require.NoError(s.t, err) + } } func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { t.Helper() - err := godotenv.Load() - if err != nil { - // it's okay if the .env file is not present - // we will use the default values - t.Log("Unable to load .env file, using default values from env") - } - suffix := "sqls_" + strings.ToLower(shared.RandomString(8)) conn, err := e2e.SetupPostgres(t, suffix) if err != nil { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 8592eb406e..c018f32df2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/joho/godotenv" "github.com/stretchr/testify/require" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" @@ -33,7 +34,14 @@ import ( peerflow "github.com/PeerDB-io/peer-flow/workflows" ) +func init() { + // it's okay if the .env file is not present + // we will use the default values + _ = godotenv.Load() +} + type Suite interface { + e2eshared.Suite T() *testing.T Connector() *connpostgres.PostgresConnector Suffix() string @@ -44,6 +52,20 @@ type RowSource interface { GetRows(table, cols string) (*model.QRecordBatch, error) } +type GenericSuite interface { + RowSource + Peer() *protos.Peer + DestinationTable(table string) string +} + +func AttachSchema(s Suite, table string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.Suffix(), table) +} + +func AddSuffix(s Suite, str string) string { + return fmt.Sprintf("%s_%s", str, s.Suffix()) +} + // Helper function to assert errors in go routines running concurrent to workflows // This achieves two goals: // 1. cancel workflow to avoid waiting on goroutine which has failed @@ -129,11 +151,13 @@ func EnvWaitForEqualTablesWithNames( pgRows, err := GetPgRows(suite.Connector(), suite.Suffix(), srcTable, cols) if err != nil { + t.Log(err) return false } rows, err := suite.GetRows(dstTable, cols) if err != nil { + t.Log(err) return false } @@ -163,18 +187,21 @@ func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, connectionGen FlowCo for { time.Sleep(time.Second) counter++ - response, err := env.Query(shared.CDCFlowStateQuery, connectionGen.FlowJobName) + response, err := env.Query(shared.FlowStatusQuery, connectionGen.FlowJobName) if err == nil { - var state peerflow.CDCFlowWorkflowState - err = response.Get(&state) + var status protos.FlowStatus + err = response.Get(&status) if err != nil { t.Fatal(err) - } else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING { + } else if status == protos.FlowStatus_STATUS_RUNNING { return + } else if counter > 30 { + env.Cancel() + t.Fatal("UNEXPECTED STATUS TIMEOUT", status) } } else if counter > 15 { env.Cancel() - t.Fatal("UNEXPECTED SETUP CDC TIMEOUT", err.Error()) + t.Fatal("UNEXPECTED STATUS QUERY TIMEOUT", err.Error()) } else if counter > 5 { // log the error for informational purposes t.Log(err.Error()) diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index b242ebb1ea..087ff58014 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -12,7 +12,11 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { +type Suite interface { + Teardown() +} + +func RunSuite[T Suite](t *testing.T, setup func(t *testing.T) T) { t.Helper() t.Parallel() @@ -26,7 +30,7 @@ func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) subtest.Parallel() suite := setup(subtest) subtest.Cleanup(func() { - teardown(suite) + suite.Teardown() }) m.Func.Call([]reflect.Value{reflect.ValueOf(suite)}) }) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 972063d494..013e0ca9e1 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -8,6 +8,7 @@ import ( "math/big" "reflect" "strconv" + "strings" "time" "cloud.google.com/go/civil" @@ -271,11 +272,14 @@ func compareHstore(value1, value2 interface{}) bool { } return string(bytes) == str2 case string: + if v1 == str2 { + return true + } parsedHStore1, err := hstore_util.ParseHstore(v1) if err != nil { panic(err) } - return parsedHStore1 == str2 + return parsedHStore1 == strings.ReplaceAll(strings.ReplaceAll(str2, " ", ""), "\n", "") default: panic(fmt.Sprintf("invalid hstore value type %T: %v", value1, value1)) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 14ca09092d..bab81b3b23 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -41,7 +41,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) } return &CDCFlowWorkflowState{ - // 1 more than the limit of 10 ActiveSignal: model.NoopSignal, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, FlowConfigUpdate: nil, diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index ccb699d8a4..ac3e66f93c 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -227,7 +227,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( } normalizedTableMapping[normalizedTableName] = tableSchema - s.logger.Info("normalized table schema: ", normalizedTableName, " -> ", tableSchema) + s.logger.Info("normalized table schema", slog.String("table", normalizedTableName), slog.Any("schema", tableSchema)) } // now setup the normalized tables on the destination peer