diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c18c14d518..9a6e83a5a2 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -998,7 +998,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. - e2e.EnvWaitForEqualTables(env, s, "normalize altered row", tableName, "id,c1") + e2e.EnvWaitForEqualTables(env, s, "normalize drop column", tableName, "id,c1") env.CancelWorkflow() }() diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index d9bb05a222..62636dd526 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -169,11 +169,12 @@ func EnvWaitForEqualTablesWithNames( func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, -) { - // wait for PeerFlowStatusQuery to finish setup - // sleep for 5 second to allow the workflow to start - time.Sleep(5 * time.Second) +) error { + // errors expected while PeerFlowStatusQuery is setup + counter := 0 for { + time.Sleep(time.Second) + counter++ response, err := env.QueryWorkflow( shared.CDCFlowStateQuery, connectionGen.FlowJobName, @@ -186,13 +187,14 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, } if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { - break + return nil } - } else { + } else if counter > 15 { + return err + } else if counter > 5 { // log the error for informational purposes slog.Error(err.Error()) } - time.Sleep(1 * time.Second) } }