diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 795d191ab6..cc3c7049d7 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -18,6 +18,7 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/google/uuid" @@ -65,7 +66,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, time.Sleep(5 * time.Second) for { response, err := env.QueryWorkflow( - peerflow.CDCFlowStatusQuery, + shared.CDCFlowStateQuery, connectionGen.FlowJobName, ) if err == nil { @@ -75,7 +76,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, slog.Error(err.Error()) } - if state.SnapshotComplete { + if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING.Enum() { break } } else { @@ -95,7 +96,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, time.Sleep(5 * time.Second) for { response, err := env.QueryWorkflow( - peerflow.CDCFlowStatusQuery, + shared.CDCFlowStateQuery, connectionGen.FlowJobName, ) if err == nil {