diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8314032bea..01cd5a71ec 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -858,16 +858,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. + wg := sync.WaitGroup{} + wg.Add(1) go func() { - // insert first row. + defer wg.Done() + e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") - // verify we got our first row. - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", "test_simple_schema_changes", "id,c1") + expectedTableSchema := &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ @@ -901,7 +904,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) + e2e.EnvWaitForEqualTables(env, s, "normalize altered row", "test_simple_schema_changes", "id,c1,c2") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ @@ -937,7 +940,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 6) + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c2 column", "test_simple_schema_changes", "id,c1,c3") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ @@ -975,7 +978,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.NormalizeFlowCountQuery(env, connectionGen, 8) + e2e.EnvWaitForEqualTables(env, s, "normalize dropped c3 column", "test_simple_schema_changes", "id,c1") expectedTableSchema = &protos.TableSchema{ TableIdentifier: strings.ToUpper(dstTableName), ColumnNames: []string{ @@ -1001,18 +1004,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.EnvNoError(s.t, env, err) e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") + + env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - env.AssertExpectations(s.t) + wg.Wait() } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index e8c0952b56..7bd24200ea 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -125,6 +125,31 @@ func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowS EnvEqualRecordBatches(t, env, pgRows, rows) } +func EnvWaitForEqualTables( + env *testsuite.TestWorkflowEnvironment, + suite e2eshared.RowSource, + reason string, + table string, + cols string, +) { + t := suite.T() + EnvWaitFor(t, env, time.Minute, reason, func(ctx context.Context) bool { + suffix := suite.Suffix() + pool := suite.Pool() + pgRows, err := GetPgRows(pool, suffix, table, cols) + if err != nil { + return false + } + + rows, err := suite.GetRows(table, cols) + if err != nil { + return false + } + + return e2eshared.CheckEqualRecordBatches(t, pgRows, rows) + }) +} + func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, ) {