diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 408ee44bf1..3ff86eebd1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -223,6 +223,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) + shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { + jobName := input.FlowConnectionConfigs.FlowJobName + return fmt.Sprintf("transferring records for job - %s", jobName) + }) + defer shutdown() + // start a goroutine to pull records from the source recordBatch := model.NewCDCRecordStream() startTime := time.Now() @@ -283,12 +289,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return syncResponse, nil } - shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { - jobName := input.FlowConnectionConfigs.FlowJobName - return fmt.Sprintf("pushing records for job - %s", jobName) - }) - defer shutdown() - syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ Records: recordBatch, diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index af2b483de1..4fd1b3dd79 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -263,8 +263,8 @@ func (p *PostgresCDCSource) consumeStream( if cdcRecordsStorage.Len() == 1 { records.SignalAsNotEmpty() - p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout))) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline)) } return nil } @@ -297,17 +297,19 @@ func (p *PostgresCDCSource) consumeStream( } } - if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock { - return nil - } + if !p.commitLock { + if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) { + return nil + } - if waitingForCommit && !p.commitLock { - p.logger.Info(fmt.Sprintf( - "[%s] commit received, returning currently accumulated records - %d", - p.flowJobName, - cdcRecordsStorage.Len()), - ) - return nil + if waitingForCommit { + p.logger.Info(fmt.Sprintf( + "[%s] commit received, returning currently accumulated records - %d", + p.flowJobName, + cdcRecordsStorage.Len()), + ) + return nil + } } // if we are past the next standby deadline (?) @@ -340,9 +342,15 @@ func (p *PostgresCDCSource) consumeStream( } else { ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) } - rawMsg, err := conn.ReceiveMessage(ctx) cancel() + + utils.RecordHeartbeatWithRecover(p.ctx, "consumeStream ReceiveMessage") + ctxErr := p.ctx.Err() + if ctxErr != nil { + return fmt.Errorf("consumeStream preempted: %w", ctxErr) + } + if err != nil && !p.commitLock { if pgconn.Timeout(err) { p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d", diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 9143483d41..b823ca772a 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -248,8 +248,7 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err) } defer func() { - // not sure if the errors these two return are same or different? - err = errors.Join(rows.Close(), rows.Err()) + err = rows.Close() if err != nil { c.logger.Error("error while closing rows for reading schema of table", slog.String("tableName", tableName), @@ -289,8 +288,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) } defer func() { - // not sure if the errors these two return are same or different? - err = errors.Join(rows.Close(), rows.Err()) + err = rows.Close() if err != nil { c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err)) } diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 270680ded1..fda4ea06d2 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -30,7 +30,7 @@ func HeartbeatRoutine( } } }() - return func() { shutdown <- struct{}{} } + return func() { close(shutdown) } } // if the functions are being called outside the context of a Temporal workflow, diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 5851bc98f6..267ff3d69d 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -29,8 +29,6 @@ type BigQueryTestHelper struct { Peer *protos.Peer // client to talk to BigQuery client *bigquery.Client - // dataset to use for testing. - datasetName string } // NewBigQueryTestHelper creates a new BigQueryTestHelper. @@ -51,7 +49,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("failed to read file: %w", err) } - var config protos.BigqueryConfig + var config *protos.BigqueryConfig err = json.Unmarshal(content, &config) if err != nil { return nil, fmt.Errorf("failed to unmarshal json: %w", err) @@ -60,7 +58,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { // suffix the dataset with the runID to namespace stateful schemas. config.DatasetId = fmt.Sprintf("%s_%d", config.DatasetId, runID) - bqsa, err := peer_bq.NewBigQueryServiceAccount(&config) + bqsa, err := peer_bq.NewBigQueryServiceAccount(config) if err != nil { return nil, fmt.Errorf("failed to create BigQueryServiceAccount: %v", err) } @@ -70,14 +68,13 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("failed to create helper BigQuery client: %v", err) } - peer := generateBQPeer(&config) + peer := generateBQPeer(config) return &BigQueryTestHelper{ - runID: runID, - Config: &config, - client: client, - datasetName: config.DatasetId, - Peer: peer, + runID: runID, + Config: config, + client: client, + Peer: peer, }, nil } @@ -115,12 +112,12 @@ func (b *BigQueryTestHelper) datasetExists(datasetName string) (bool, error) { // RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again. func (b *BigQueryTestHelper) RecreateDataset() error { - exists, err := b.datasetExists(b.datasetName) + exists, err := b.datasetExists(b.Config.DatasetId) if err != nil { return fmt.Errorf("failed to check if dataset %s exists: %w", b.Config.DatasetId, err) } - dataset := b.client.Dataset(b.datasetName) + dataset := b.client.Dataset(b.Config.DatasetId) if exists { err := dataset.DeleteWithContents(context.Background()) if err != nil { @@ -158,7 +155,9 @@ func (b *BigQueryTestHelper) DropDataset(datasetName string) error { // RunCommand runs the given command. func (b *BigQueryTestHelper) RunCommand(command string) error { - _, err := b.client.Query(command).Read(context.Background()) + q := b.client.Query(command) + q.DisableQueryCache = true + _, err := q.Read(context.Background()) if err != nil { return fmt.Errorf("failed to run command: %w", err) } @@ -168,7 +167,7 @@ func (b *BigQueryTestHelper) RunCommand(command string) error { // countRows(tableName) returns the number of rows in the given table. func (b *BigQueryTestHelper) countRows(tableName string) (int, error) { - return b.countRowsWithDataset(b.datasetName, tableName, "") + return b.countRowsWithDataset(b.Config.DatasetId, tableName, "") } func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) { @@ -177,7 +176,9 @@ func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, non command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol + " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;") } - it, err := b.client.Query(command).Read(context.Background()) + q := b.client.Query(command) + q.DisableQueryCache = true + it, err := q.Read(context.Background()) if err != nil { return 0, fmt.Errorf("failed to run command: %w", err) } @@ -305,7 +306,9 @@ func bqSchemaToQRecordSchema(schema bigquery.Schema) (*model.QRecordSchema, erro } func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) { - it, err := b.client.Query(query).Read(context.Background()) + q := b.client.Query(query) + q.DisableQueryCache = true + it, err := q.Read(context.Background()) if err != nil { return nil, fmt.Errorf("failed to run command: %w", err) } @@ -358,10 +361,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor }, nil } -/* -if the function errors or there are nulls, the function returns false -else true -*/ +// returns whether the function errors or there are nulls func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool, error) { if len(ColName) == 0 { return true, nil @@ -369,7 +369,9 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool joinedString := strings.Join(ColName, " is null or ") + " is null" command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s` WHERE %s", b.Config.DatasetId, tableName, joinedString) - it, err := b.client.Query(command).Read(context.Background()) + q := b.client.Query(command) + q.DisableQueryCache = true + it, err := q.Read(context.Background()) if err != nil { return false, fmt.Errorf("failed to run command: %w", err) } @@ -401,7 +403,9 @@ func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, ColName []strin csep := strings.Join(ColName, ",") command := fmt.Sprintf("SELECT %s FROM `%s.%s`", csep, b.Config.DatasetId, tableName) - it, err := b.client.Query(command).Read(context.Background()) + q := b.client.Query(command) + q.DisableQueryCache = true + it, err := q.Read(context.Background()) if err != nil { return false, fmt.Errorf("failed to run command: %w", err) } @@ -474,7 +478,7 @@ func (b *BigQueryTestHelper) CreateTable(tableName string, schema *model.QRecord fields = append(fields, fmt.Sprintf("`%s` %s", field.Name, bqType)) } - command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.datasetName, tableName, strings.Join(fields, ", ")) + command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", b.Config.DatasetId, tableName, strings.Join(fields, ", ")) err := b.RunCommand(command) if err != nil { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 8c45c29872..4408572ca1 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -6,7 +6,6 @@ import ( "fmt" "log/slog" "strings" - "sync" "testing" "time" @@ -46,21 +45,26 @@ func (s PeerFlowE2ETestSuiteBQ) Suffix() string { } func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { + s.t.Helper() qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) s.t.Logf("running query on bigquery: %s", bqSelQuery) return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) } +func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error) { + s.t.Helper() + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY id", colsString, qualifiedTableName, where) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) { - err := e2e.TearDownPostgres(s.pool, s.bqSuffix) - if err != nil { - slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.t.FailNow() - } + e2e.TearDownPostgres(s) - err = s.bqHelper.DropDataset(s.bqHelper.datasetName) + err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) s.t.FailNow() @@ -71,7 +75,7 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error { res, err := s.bqHelper.ExecuteAndProcessQuery(fmt.Sprintf( "SELECT `%s`.%s FROM `%s.%s`;", - colName, fieldName, s.bqHelper.datasetName, tableName)) + colName, fieldName, s.bqHelper.Config.DatasetId, tableName)) if err != nil { return fmt.Errorf("json value check failed: %v", err) } @@ -323,7 +327,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -388,7 +392,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -455,7 +459,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // and execute a transaction touching toast columns done := make(chan struct{}) go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -516,7 +520,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -587,7 +591,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -653,7 +657,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -724,7 +728,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -807,7 +811,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2, 'NaN'::double precision, '{NaN, Infinity, -Infinity}'; @@ -864,7 +868,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -901,10 +905,10 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { // We inserted 4 invalid shapes in each. // They should have been filtered out as null on destination - lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "line") + lineCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.Config.DatasetId, dstTableName, "line") require.NoError(s.t, err) - polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.datasetName, dstTableName, "`polyPoly`") + polyCount, err := s.bqHelper.countRowsWithDataset(s.bqHelper.Config.DatasetId, dstTableName, "`polyPoly`") require.NoError(s.t, err) require.Equal(s.t, 6, lineCount) @@ -944,7 +948,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -974,8 +978,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") - dstTableName := "test_simple_schema_changes" + tableName := "test_simple_schema_changes" + srcTableName := s.attachSchemaSuffix(tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -986,8 +990,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_schema_changes"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, + FlowJobName: s.attachSuffix(tableName), + TableNameMapping: map[string]string{srcTableName: tableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", @@ -996,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 1, + ExitAfterRecords: -1, MaxBatchSize: 100, } @@ -1004,15 +1008,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) + INSERT INTO %s(c1) VALUES (1)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") - // verify we got our first row. - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1020,13 +1022,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) + INSERT INTO %s(c1,c2) VALUES (2,2)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") + e2e.EnvWaitForEqualTables(env, s, "normalize altered row", tableName, "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1034,13 +1035,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) + INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") + e2e.EnvWaitForEqualTables(env, s, "normalize altered row", tableName, "id,c1,c3") // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1048,31 +1048,25 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) + INSERT INTO %s(c1) VALUES (4)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + e2e.EnvWaitForEqualTables(env, s, "normalize drop column", tableName, "id,c1") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) + tableName := "test_simple_cpkey" srcTableName := s.attachSchemaSuffix("test_simple_cpkey") - dstTableName := "test_simple_cpkey" _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1087,7 +1081,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, + TableNameMapping: map[string]string{srcTableName: tableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", @@ -1096,14 +1090,14 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -1115,26 +1109,20 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { s.t.Log("Inserted 10 rows into the source table") // verify we got our 10 rows - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - e2e.EnvEqualTables(env, s, dstTableName, "id,c1,c2,t") + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,c2,t") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) - }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.EnvWaitForEqualTables(env, s, "normalize update", tableName, "id,c1,c2,t") - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") + env.CancelWorkflow() + }() - e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t") + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { @@ -1174,7 +1162,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1215,8 +1203,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) + tableName := "test_cpkey_toast2" srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") - dstTableName := "test_cpkey_toast2" _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1232,7 +1220,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, + TableNameMapping: map[string]string{srcTableName: tableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", @@ -1241,14 +1229,14 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1260,25 +1248,18 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } s.t.Log("Inserted 10 rows into the source table") - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize update", tableName, "id,c2,t,t2") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2") } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { @@ -1312,7 +1293,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) @@ -1347,7 +1328,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" - secondDataset := fmt.Sprintf("%s_2", s.bqHelper.datasetName) + secondDataset := fmt.Sprintf("%s_2", s.bqHelper.Config.DatasetId) srcTable2Name := s.attachSchemaSuffix("test2_bq") dstTable2Name := "test2_bq" @@ -1378,7 +1359,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -1410,9 +1391,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - cmpTableName := s.attachSchemaSuffix("test_softdel") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) - dstTableName := "test_softdel" + tableName := "test_softdel" + srcName := "test_softdel_src" + srcTableName := s.attachSchemaSuffix(srcName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1425,7 +1406,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel"), + FlowJobName: s.attachSuffix(tableName), } config := &protos.FlowConnectionConfigs{ @@ -1434,7 +1415,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { TableMappings: []*protos.TableMapping{ { SourceTableIdentifier: srcTableName, - DestinationTableIdentifier: dstTableName, + DestinationTableIdentifier: tableName, }, }, Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), @@ -1445,49 +1426,46 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - defer wg.Done() - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", srcName, tableName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - // since we delete stuff, create another table to compare with - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", srcName, tableName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { + pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t") + if err != nil { + return false + } + rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED") + if err != nil { + return false + } + return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - wg.Wait() - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t") - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, - s.bqHelper.datasetName, dstTableName) + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + s.bqHelper.Config.DatasetId, tableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numNewRows) @@ -1539,7 +1517,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1569,9 +1547,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { // verify our updates and delete happened e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t") - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, - s.bqHelper.datasetName, dstTableName) + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + s.bqHelper.Config.DatasetId, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numNewRows) @@ -1581,9 +1559,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - cmpTableName := s.attachSchemaSuffix("test_softdel_ud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) - dstTableName := "test_softdel_ud" + srcName := "test_softdel_ud_src" + srcTableName := s.attachSchemaSuffix(srcName) + dstName := "test_softdel_ud" _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1596,7 +1574,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel_ud"), + FlowJobName: s.attachSuffix(dstName), } config := &protos.FlowConnectionConfigs{ @@ -1605,7 +1583,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { TableMappings: []*protos.TableMapping{ { SourceTableIdentifier: srcTableName, - DestinationTableIdentifier: dstTableName, + DestinationTableIdentifier: dstName, }, }, Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), @@ -1616,19 +1594,19 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 4, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", srcName, dstName, "id,c1,c2,t") insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1638,39 +1616,40 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - // since we delete stuff, create another table to compare with - _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize transaction", func() bool { + pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t") + e2e.EnvNoError(s.t, env, err) + rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED") + if err != nil { + return false + } + return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t") - - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, - s.bqHelper.datasetName, dstTableName) + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + s.bqHelper.Config.DatasetId, dstName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, int64(1), numNewRows) + require.Equal(s.t, int64(0), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_softdel_iad") - dstTableName := "test_softdel_iad" + tableName := "test_softdel_iad" + srcTableName := s.attachSchemaSuffix(tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1683,7 +1662,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel_iad"), + FlowJobName: s.attachSuffix(tableName), } config := &protos.FlowConnectionConfigs{ @@ -1692,7 +1671,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { TableMappings: []*protos.TableMapping{ { SourceTableIdentifier: srcTableName, - DestinationTableIdentifier: dstTableName, + DestinationTableIdentifier: tableName, }, }, Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), @@ -1703,39 +1682,46 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { + pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, "id,c1,c2,t") + if err != nil { + return false + } + rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED") + if err != nil { + return false + } + return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1,c2,t") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t") - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, - s.bqHelper.datasetName, dstTableName) + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + s.bqHelper.Config.DatasetId, tableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, int64(0), numNewRows) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 0e1789fff6..f324a8e8f5 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -4,12 +4,15 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" ) const ( @@ -134,16 +137,26 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { return pool, nil } -func TearDownPostgres(pool *pgxpool.Pool, suffix string) error { - // drop the e2e_test schema +func TearDownPostgres[T e2eshared.Suite](s T) { + t := s.T() + t.Helper() + pool := s.Pool() + suffix := s.Suffix() + if pool != nil { - err := cleanPostgres(pool, suffix) - if err != nil { - return err + t.Log("begin tearing down postgres schema", suffix) + deadline := time.Now().Add(2 * time.Minute) + for { + err := cleanPostgres(pool, suffix) + if err == nil { + pool.Close() + return + } else if time.Now().After(deadline) { + require.Fail(t, "failed to teardown postgres schema", suffix) + } + time.Sleep(time.Second) } - pool.Close() } - return nil } // GeneratePostgresPeer generates a postgres peer config for testing. @@ -196,18 +209,19 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos }) } - ret := &protos.FlowConnectionConfigs{} - ret.FlowJobName = c.FlowJobName - ret.TableMappings = tblMappings - ret.Source = GeneratePostgresPeer(c.PostgresPort) - ret.Destination = c.Destination - ret.CdcStagingPath = c.CdcStagingPath - ret.SoftDelete = c.SoftDelete + ret := &protos.FlowConnectionConfigs{ + FlowJobName: c.FlowJobName, + TableMappings: tblMappings, + Source: GeneratePostgresPeer(c.PostgresPort), + Destination: c.Destination, + CdcStagingPath: c.CdcStagingPath, + SoftDelete: c.SoftDelete, + SyncedAtColName: "_PEERDB_SYNCED_AT", + IdleTimeoutSeconds: 15, + } if ret.SoftDelete { ret.SoftDeleteColName = "_PEERDB_IS_DELETED" } - ret.SyncedAtColName = "_PEERDB_SYNCED_AT" - ret.IdleTimeoutSeconds = 10 return ret } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 704150731d..6d5a549854 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "sync" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" ) func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { @@ -46,6 +47,32 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro return nil } +func (s PeerFlowE2ETestSuitePG) WaitForSchema( + env *testsuite.TestWorkflowEnvironment, + reason string, + srcTableName string, + dstTableName string, + cols string, + expectedSchema *protos.TableSchema, +) { + s.t.Helper() + e2e.EnvWaitFor(s.t, env, 3*time.Minute, reason, func() bool { + s.t.Helper() + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + TableIdentifiers: []string{dstTableName}, + }) + if err != nil { + return false + } + tableSchema := output.TableNameSchemaMapping[dstTableName] + if !e2e.CompareTableSchemas(expectedSchema, tableSchema) { + s.t.Log("schemas unequal", expectedSchema, tableSchema) + return false + } + return s.comparePGTables(srcTableName, dstTableName, cols) == nil + }) +} + func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) @@ -79,7 +106,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -141,7 +168,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(my_mood, my_null_mood) VALUES ('happy',null) `, srcTableName)) @@ -187,35 +214,30 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 1, - MaxBatchSize: 100, + ExitAfterRecords: -1, + MaxBatchSize: 1, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") - // verify we got our first row. - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - expectedTableSchema := &protos.TableSchema{ - TableIdentifier: dstTableName, - ColumnNames: []string{"id", "c1"}, - ColumnTypes: []string{string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64)}, + s.WaitForSchema(env, "normalizing first row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{ + TableIdentifier: dstTableName, + ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT"}, + ColumnTypes: []string{ + string(qvalue.QValueKindInt64), + string(qvalue.QValueKindInt64), + string(qvalue.QValueKindTimestamp), + }, PrimaryKeyColumns: []string{"id"}, - } - output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, }) - e2e.EnvNoError(s.t, env, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - e2e.EnvNoError(s.t, env, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -227,25 +249,17 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") - // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - expectedTableSchema = &protos.TableSchema{ + s.WaitForSchema(env, "normalizing altered row", srcTableName, dstTableName, "id,c1,c2", &protos.TableSchema{ TableIdentifier: dstTableName, - ColumnNames: []string{"id", "c1", "c2"}, + ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2"}, ColumnTypes: []string{ string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64), + string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, - } - output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, }) - e2e.EnvNoError(s.t, env, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - e2e.EnvNoError(s.t, env, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -257,26 +271,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") - // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - expectedTableSchema = &protos.TableSchema{ + s.WaitForSchema(env, "normalizing dropped column row", srcTableName, dstTableName, "id,c1,c3", &protos.TableSchema{ TableIdentifier: dstTableName, - ColumnNames: []string{"id", "c1", "c2", "c3"}, + ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"}, ColumnTypes: []string{ string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64), + string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, - } - output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, }) - e2e.EnvNoError(s.t, env, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - e2e.EnvNoError(s.t, env, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -288,36 +294,23 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") - // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - expectedTableSchema = &protos.TableSchema{ + s.WaitForSchema(env, "normalizing 2nd dropped column row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{ TableIdentifier: dstTableName, - ColumnNames: []string{"id", "c1", "c2", "c3"}, + ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"}, ColumnTypes: []string{ string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64), + string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindInt64), string(qvalue.QValueKindInt64), }, PrimaryKeyColumns: []string{"id"}, - } - output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ - TableIdentifiers: []string{dstTableName}, }) - e2e.EnvNoError(s.t, env, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - e2e.EnvNoError(s.t, env, err) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { @@ -346,17 +339,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -367,29 +359,22 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } s.t.Log("Inserted 10 rows into the source table") - // verify we got our 10 rows - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 10 rows", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize modifications", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { @@ -423,7 +408,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -433,7 +417,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -502,17 +486,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -524,26 +507,23 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } s.t.Log("Inserted 10 rows into the source table") - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize 10 rows", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) - }() - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize update", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil + }) - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") + env.CancelWorkflow() + }() - // verify our updates and delete happened - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - require.NoError(s.t, err) + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) } func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { @@ -571,7 +551,6 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -579,7 +558,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) @@ -647,27 +626,27 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - defer wg.Done() - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize update", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) @@ -675,23 +654,25 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { + return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - wg.Wait() // verify our updates and delete happened err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") require.NoError(s.t, err) - softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, - dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + softDeleteQuery := fmt.Sprintf( + `SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, + dstTableName, + ) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -742,7 +723,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -774,9 +755,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { require.NoError(s.t, err) softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -820,19 +801,21 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 4, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -842,30 +825,27 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - // since we delete stuff, create another table to compare with - _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize transaction", func() bool { + return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") require.NoError(s.t, err) softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(1), numRows) } @@ -908,41 +888,43 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize row", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool { + return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil + }) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize reinsert", func() bool { + return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil + }) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) softDeleteQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, dstTableName) - numRows, err := s.countRowsInQuery(softDeleteQuery) + numRows, err := s.RunInt64Query(softDeleteQuery) require.NoError(s.t, err) require.Equal(s.t, int64(0), numRows) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 6a26e2e8c4..fe98ffa01f 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -29,12 +29,21 @@ type PeerFlowE2ETestSuitePG struct { suffix string } +func (s PeerFlowE2ETestSuitePG) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuitePG) Pool() *pgxpool.Pool { + return s.pool +} + +func (s PeerFlowE2ETestSuitePG) Suffix() string { + return s.suffix +} + func TestPeerFlowE2ETestSuitePG(t *testing.T) { e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } + e2e.TearDownPostgres(s) }) } @@ -167,7 +176,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { return rows.Err() } -func (s PeerFlowE2ETestSuitePG) countRowsInQuery(query string) (int64, error) { +func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) { var count pgtype.Int8 err := s.pool.QueryRow(context.Background(), query).Scan(&count) return count.Int64, err diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 415e1fdcbb..b26e44ad25 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -49,8 +49,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - e2e.EnvNoError(s.t, env, err) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 6eaa2ebc31..46dd16ef4c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -24,13 +24,22 @@ type PeerFlowE2ETestSuiteS3 struct { suffix string } +func (s PeerFlowE2ETestSuiteS3) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteS3) Pool() *pgxpool.Pool { + return s.pool +} + +func (s PeerFlowE2ETestSuiteS3) Suffix() string { + return s.suffix +} + func tearDownSuite(s PeerFlowE2ETestSuiteS3) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } + e2e.TearDownPostgres(s) - err = s.s3Helper.CleanUp() + err := s.s3Helper.CleanUp() if err != nil { require.Fail(s.t, "failed to clean up s3", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 3b53be2791..056de3a1fb 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -48,6 +48,7 @@ func (s PeerFlowE2ETestSuiteSF) Suffix() string { } func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { + s.t.Helper() qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) s.t.Logf("running query on snowflake: %s", sfSelQuery) @@ -56,21 +57,17 @@ func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*m func TestPeerFlowE2ETestSuiteSF(t *testing.T) { e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - err := e2e.TearDownPostgres(s.pool, s.pgSuffix) - if err != nil { - slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.t.FailNow() - } + e2e.TearDownPostgres(s) if s.sfHelper != nil { - err = s.sfHelper.Cleanup() + err := s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) s.t.FailNow() } } - err = s.connector.Close() + err := s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) s.t.FailNow() @@ -133,8 +130,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") + tableName := "test_simple_flow_sf" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -146,7 +144,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), + FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -155,14 +153,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 20, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -173,21 +171,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,key,value") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - count, err := s.sfHelper.CountRows("test_simple_flow_sf") - require.NoError(s.t, err) - require.Equal(s.t, 20, count) - // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. newerSyncedAtQuery := fmt.Sprintf(` @@ -196,16 +186,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 20, numNewRows) - - // TODO: verify that the data is correctly synced to the destination table - // on the Snowflake side } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") + tableName := "test_replica_identity_no_pkey" + srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_replica_identity_no_pkey") // Create a table without a primary key and create a named unique index @@ -221,7 +209,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_flow"), + FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -237,7 +225,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -268,7 +256,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") + tableName := "test_invalid_geo_sf_avro_cdc" + srcTableName := s.attachSchemaSuffix(tableName) dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -281,7 +270,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), + FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -297,7 +286,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -377,7 +366,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -445,7 +434,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { go func() { defer wg.Done() - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -503,7 +492,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -573,7 +562,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -638,7 +627,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -709,7 +698,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -794,7 +783,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -843,22 +832,21 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 1, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { - // insert first row. - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") - // verify we got our first row. - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1") + expectedTableSchema := &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ @@ -878,8 +866,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) e2e.EnvNoError(s.t, env, err) - e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -892,29 +879,27 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) + e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ "ID", "C1", - "C2", - "_PEERDB_IS_DELETED", "_PEERDB_SYNCED_AT", + "C2", }, ColumnTypes: []string{ string(qvalue.QValueKindNumeric), string(qvalue.QValueKindNumeric), - string(qvalue.QValueKindNumeric), - string(qvalue.QValueKindBoolean), string(qvalue.QValueKindTimestamp), + string(qvalue.QValueKindNumeric), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) e2e.EnvNoError(s.t, env, err) - e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. @@ -928,31 +913,29 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 6) + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ "ID", "C1", + "_PEERDB_SYNCED_AT", "C2", "C3", - "_PEERDB_IS_DELETED", - "_PEERDB_SYNCED_AT", }, ColumnTypes: []string{ string(qvalue.QValueKindNumeric), string(qvalue.QValueKindNumeric), + string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindNumeric), - string(qvalue.QValueKindBoolean), - string(qvalue.QValueKindTimestamp), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) e2e.EnvNoError(s.t, env, err) - e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. @@ -966,42 +949,35 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 8) + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ "ID", "C1", + "_PEERDB_SYNCED_AT", "C2", "C3", - "_PEERDB_IS_DELETED", - "_PEERDB_SYNCED_AT", }, ColumnTypes: []string{ string(qvalue.QValueKindNumeric), string(qvalue.QValueKindNumeric), + string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindNumeric), - string(qvalue.QValueKindBoolean), - string(qvalue.QValueKindTimestamp), }, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) e2e.EnvNoError(s.t, env, err) - e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvTrue(s.t, env, e2e.CompareTableSchemas(expectedTableSchema, output.TableNameSchemaMapping[dstTableName])) e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { @@ -1032,14 +1008,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -1050,28 +1026,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } s.t.Log("Inserted 10 rows into the source table") - // verify we got our 10 rows - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - e2e.EnvEqualTables(env, s, "test_simple_cpkey", "id,c1,c2,t") + e2e.EnvWaitForEqualTables(env, s, "normalize table", "test_simple_cpkey", "id,c1,c2,t") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", "test_simple_cpkey", "id,c1,c2,t") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_simple_cpkey", "id,c1,c2,t") } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { @@ -1110,7 +1077,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1151,8 +1118,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") + tableName := "test_cpkey_toast2" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1167,7 +1135,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), + FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, @@ -1176,14 +1144,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1195,33 +1163,27 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } s.t.Log("Inserted 10 rows into the source table") - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c2,t,t2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c2,t,t2") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_cpkey_toast2", "id,c1,c2,t,t2") } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_exclude_sf") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") + tableName := "test_exclude_sf" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1236,7 +1198,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_exclude_flow"), + FlowJobName: s.attachSuffix(tableName), } config := &protos.FlowConnectionConfigs{ @@ -1255,14 +1217,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 10, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1274,38 +1236,36 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } s.t.Log("Inserted 10 rows into the source table") - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,c1,t,t2") _, err = s.pool.Exec(context.Background(), - fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,c1,t,t2") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", - s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) - sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) + sfRows, err := s.GetRows(tableName, "*") require.NoError(s.t, err) for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } require.Equal(s.t, 5, len(sfRows.Schema.Fields)) - require.Equal(s.t, 10, len(sfRows.Records)) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - cmpTableName := s.attachSchemaSuffix("test_softdel") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel") + tableName := "test_softdel_src" + dstName := "test_softdel" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, dstName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1318,7 +1278,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel"), + FlowJobName: s.attachSuffix(dstName), } config := &protos.FlowConnectionConfigs{ @@ -1338,48 +1298,42 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - defer wg.Done() - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize row", tableName, dstName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - // since we delete stuff, create another table to compare with - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", tableName, dstName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize delete", + tableName, + dstName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - wg.Wait() - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 1, numNewRows) @@ -1431,7 +1385,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1462,7 +1416,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 1, numNewRows) @@ -1472,9 +1426,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - cmpTableName := s.attachSchemaSuffix("test_softdel_ud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_ud") + tableName := "test_softdel_ud_src" + dstName := "test_softdel_ud" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, dstName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1487,7 +1442,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel_ud"), + FlowJobName: s.attachSuffix(dstName), } config := &protos.FlowConnectionConfigs{ @@ -1507,19 +1462,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 4, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize insert", tableName, dstName, "id,c1,c2,t") insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1529,27 +1484,27 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - // since we delete stuff, create another table to compare with - _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize transaction", + tableName, + dstName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 1, numNewRows) @@ -1559,8 +1514,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) - srcTableName := s.attachSchemaSuffix("test_softdel_iad") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad") + tableName := "test_softdel_iad" + srcTableName := s.attachSchemaSuffix(tableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1573,7 +1529,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel_iad"), + FlowJobName: s.attachSuffix(tableName), } config := &protos.FlowConnectionConfigs{ @@ -1593,38 +1549,43 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { } limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 3, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + e2e.EnvWaitForEqualTables(env, s, "normalize row", tableName, "id,c1,c2,t") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize delete", + tableName, + tableName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1,c2,t") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - require.Contains(s.t, err.Error(), "continue as new") - - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) require.Equal(s.t, 0, numNewRows) @@ -1657,14 +1618,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ - ExitAfterRecords: 20, + ExitAfterRecords: -1, MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -1675,17 +1636,17 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize mixed case", + "testMixedCase", + "\"testMixedCase\"", + "id,\"pulseArmor\",\"highGold\",\"eVe\"", + ) + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - s.compareTableContentsWithDiffSelectorsSF("testMixedCase", `"pulseArmor","highGold","eVe",id`, - `"pulseArmor","highGold","eVe",id`, true) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index e14bf4ec0b..b23ec8bb67 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -32,15 +32,10 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va return nil } -func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string, - tableCaseSensitive bool, -) { +func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string) { pgRows, err := e2e.GetPgRows(s.pool, s.pgSuffix, tableName, pgSelector) require.NoError(s.t, err) - if tableCaseSensitive { - tableName = fmt.Sprintf("\"%s\"", tableName) - } sfRows, err := s.GetRows(tableName, sfSelector) require.NoError(s.t, err) @@ -83,7 +78,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) err = s.checkJSONValue(dstSchemaQualified, "f7", "key", "\"value\"") require.NoError(s.t, err) @@ -129,7 +124,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { @@ -169,7 +164,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { @@ -213,7 +208,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { @@ -256,7 +251,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) } func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 8dd9bfa60a..88ce61e60d 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -42,13 +42,13 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("failed to read file: %w", err) } - var config protos.SnowflakeConfig + var config *protos.SnowflakeConfig err = json.Unmarshal(content, &config) if err != nil { return nil, fmt.Errorf("failed to unmarshal json: %w", err) } - peer := generateSFPeer(&config) + peer := generateSFPeer(config) runID, err := shared.RandomUInt64() if err != nil { return nil, fmt.Errorf("failed to generate random uint64: %w", err) @@ -56,7 +56,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { testDatabaseName := fmt.Sprintf("e2e_test_%d", runID) - adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), &config) + adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config) if err != nil { return nil, fmt.Errorf("failed to create Snowflake client: %w", err) } @@ -66,13 +66,13 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { } config.Database = testDatabaseName - testClient, err := connsnowflake.NewSnowflakeClient(context.Background(), &config) + testClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config) if err != nil { return nil, fmt.Errorf("failed to create Snowflake client: %w", err) } return &SnowflakeTestHelper{ - Config: &config, + Config: config, Peer: peer, adminClient: adminClient, testClient: testClient, diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 25f11dfc44..f7378ea2ae 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -29,13 +29,24 @@ type PeerFlowE2ETestSuiteSQLServer struct { suffix string } +func (s PeerFlowE2ETestSuiteSQLServer) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteSQLServer) Pool() *pgxpool.Pool { + return s.pool +} + +func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string { + return s.suffix +} + func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSQLServer) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - require.NoError(s.t, err) + e2e.TearDownPostgres(s) if s.sqlsHelper != nil { - err = s.sqlsHelper.CleanUp() + err := s.sqlsHelper.CleanUp() require.NoError(s.t, err) } }) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index e043e97d54..641fa594e5 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -8,6 +8,7 @@ import ( "log/slog" "os" "runtime" + "slices" "strings" "testing" "time" @@ -15,8 +16,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - conn_utils "github.com/PeerDB-io/peer-flow/connectors/utils" - utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" @@ -36,13 +36,13 @@ import ( func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { t.Helper() - conn, err := utils.GetCatalogConnectionPoolFromEnv() + conn, err := pgxpool.New(context.Background(), utils.GetPGConnectionString(GetTestPostgresConf())) if err != nil { t.Fatalf("unable to create catalog connection pool: %v", err) } - // set a 300 second timeout for the workflow to execute a few runs. - env.SetTestTimeout(300 * time.Second) + // set a 5 minute timeout for the workflow to execute a few runs. + env.SetTestTimeout(5 * time.Minute) env.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) env.RegisterWorkflow(peerflow.SyncFlowWorkflow) @@ -73,18 +73,17 @@ func EnvNoError(t *testing.T, env *testsuite.TestWorkflowEnvironment, err error) t.Helper() if err != nil { - t.Error(err.Error()) + t.Error("UNEXPECTED ERROR", err.Error()) env.CancelWorkflow() runtime.Goexit() } } -// See EnvNoError -func EnvEqual[T comparable](t *testing.T, env *testsuite.TestWorkflowEnvironment, x T, y T) { +func EnvTrue(t *testing.T, env *testsuite.TestWorkflowEnvironment, val bool) { t.Helper() - if x != y { - t.Error("not equal", x, y) + if !val { + t.Error("UNEXPECTED FALSE") env.CancelWorkflow() runtime.Goexit() } @@ -129,41 +128,56 @@ func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowS EnvEqualRecordBatches(t, env, pgRows, rows) } -func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, - connectionGen FlowConnectionGenerationConfig, +func EnvWaitForEqualTables( + env *testsuite.TestWorkflowEnvironment, + suite e2eshared.RowSource, + reason string, + table string, + cols string, ) { - // wait for PeerFlowStatusQuery to finish setup - // sleep for 5 second to allow the workflow to start - time.Sleep(5 * time.Second) - for { - response, err := env.QueryWorkflow( - shared.CDCFlowStateQuery, - connectionGen.FlowJobName, - ) - if err == nil { - var state peerflow.CDCFlowWorkflowState - err = response.Get(&state) - if err != nil { - slog.Error(err.Error()) - } else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { - break - } - } else { - // log the error for informational purposes - slog.Error(err.Error()) + suite.T().Helper() + EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols) +} + +func EnvWaitForEqualTablesWithNames( + env *testsuite.TestWorkflowEnvironment, + suite e2eshared.RowSource, + reason string, + srcTable string, + dstTable string, + cols string, +) { + t := suite.T() + t.Helper() + + EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { + t.Helper() + + suffix := suite.Suffix() + pool := suite.Pool() + pgRows, err := GetPgRows(pool, suffix, srcTable, cols) + if err != nil { + return false } - time.Sleep(1 * time.Second) - } + + rows, err := suite.GetRows(dstTable, cols) + if err != nil { + return false + } + + return e2eshared.CheckEqualRecordBatches(t, pgRows, rows) + }) } -func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, +func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, - minCount int, ) { - // wait for PeerFlowStatusQuery to finish setup - // sleep for 5 second to allow the workflow to start - time.Sleep(5 * time.Second) + t.Helper() + // errors expected while PeerFlowStatusQuery is setup + counter := 0 for { + time.Sleep(time.Second) + counter++ response, err := env.QueryWorkflow( shared.CDCFlowStateQuery, connectionGen.FlowJobName, @@ -173,14 +187,17 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, err = response.Get(&state) if err != nil { slog.Error(err.Error()) - } else if len(state.NormalizeFlowStatuses) >= minCount { - break + } else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + return } - } else { + } else if counter > 15 { + t.Error("UNEXPECTED SETUP CDC TIMEOUT", err.Error()) + env.CancelWorkflow() + runtime.Goexit() + } else if counter > 5 { // log the error for informational purposes slog.Error(err.Error()) } - time.Sleep(1 * time.Second) } } @@ -238,7 +255,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err tblFieldStr := strings.Join(tblFields, ",") var pgErr *pgconn.PgError _, enumErr := pool.Exec(context.Background(), createMoodEnum) - if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !conn_utils.IsUniqueError(pgErr) { + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { return enumErr } _, err := pool.Exec(context.Background(), fmt.Sprintf(` @@ -511,17 +528,35 @@ func (l *TStructuredLogger) Error(msg string, keyvals ...interface{}) { l.logger.With(l.keyvalsToFields(keyvals)).Error(msg) } +func CompareTableSchemas(x *protos.TableSchema, y *protos.TableSchema) bool { + return x.TableIdentifier == y.TableIdentifier || + x.IsReplicaIdentityFull == y.IsReplicaIdentityFull || + slices.Compare(x.PrimaryKeyColumns, y.PrimaryKeyColumns) == 0 || + slices.Compare(x.ColumnNames, y.ColumnNames) == 0 || + slices.Compare(x.ColumnTypes, y.ColumnTypes) == 0 +} + func RequireEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.QRecordBatch) { t.Helper() require.True(t, e2eshared.CheckEqualRecordBatches(t, q, other)) } -// See EnvNoError func EnvEqualRecordBatches(t *testing.T, env *testsuite.TestWorkflowEnvironment, q *model.QRecordBatch, other *model.QRecordBatch) { t.Helper() + EnvTrue(t, env, e2eshared.CheckEqualRecordBatches(t, q, other)) +} - if !e2eshared.CheckEqualRecordBatches(t, q, other) { - env.CancelWorkflow() - runtime.Goexit() +func EnvWaitFor(t *testing.T, env *testsuite.TestWorkflowEnvironment, timeout time.Duration, reason string, f func() bool) { + t.Helper() + t.Log("WaitFor", reason, time.Now()) + + deadline := time.Now().Add(timeout) + for !f() { + if time.Now().After(deadline) { + t.Error("UNEXPECTED TIMEOUT", reason, time.Now()) + env.CancelWorkflow() + runtime.Goexit() + } + time.Sleep(time.Second) } } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index c9a5eca325..176564d342 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -12,10 +12,14 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -type RowSource interface { +type Suite interface { T() *testing.T Pool() *pgxpool.Pool Suffix() string +} + +type RowSource interface { + Suite GetRows(table, cols string) (*model.QRecordBatch, error) } @@ -111,8 +115,6 @@ func CheckEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.Q for i, record := range q.Records { if !CheckQRecordEquality(t, record, other.Records[i]) { t.Logf("Record %d is not equal", i) - t.Logf("Record 1: %v", record) - t.Logf("Record 2: %v", other.Records[i]) return false } } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index ebbf15f807..f6a5bda189 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -216,7 +216,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) @@ -242,8 +243,9 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - TaskQueue: taskQueue, - SearchAttributes: mirrorNameSearch, + TaskQueue: taskQueue, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) @@ -331,6 +333,10 @@ func CDCFlowWorkflowWithConfig( // check and act on signals before a fresh flow starts. w.receiveAndHandleSignalAsync(ctx, state) + if err := ctx.Err(); err != nil { + return nil, err + } + if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED @@ -343,6 +349,8 @@ func CDCFlowWorkflowWithConfig( ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + } else if err := ctx.Err(); err != nil { + return nil, err } } @@ -386,7 +394,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping @@ -458,7 +467,8 @@ func CDCFlowWorkflowWithConfig( RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 20, }, - SearchAttributes: mirrorNameSearch, + SearchAttributes: mirrorNameSearch, + WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 09849b1752..0b82ca4c22 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -41,6 +41,7 @@ func (s *SyncFlowExecution) executeSyncFlow( syncMetaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, + WaitForCancellation: true, }) // execute GetLastSyncedID on destination peer @@ -65,6 +66,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, HeartbeatTimeout: 30 * time.Second, + WaitForCancellation: true, }) // execute StartFlow on the peers to start the flow @@ -83,6 +85,7 @@ func (s *SyncFlowExecution) executeSyncFlow( replayTableSchemaDeltaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 30 * time.Minute, + WaitForCancellation: true, }) replayTableSchemaInput := &protos.ReplayTableSchemaDeltaInput{ FlowConnectionConfigs: config,