diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c13e02b6e0..ee3e6a9a7f 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -12,12 +12,16 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/workflow" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) type PeerFlowE2ETestSuitePG struct { @@ -290,10 +294,83 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) + require.NoError(s.t, env.GetWorkflowError()) - err = env.GetWorkflowError() + err = s.checkSyncedAt(dstSchemaQualified) require.NoError(s.t, err) +} - err = s.checkSyncedAt(dstSchemaQualified) +func PauseTestWorkflow(ctx workflow.Context, workflowID string, config *protos.QRepConfig) error { + state := peerflow.NewQRepFlowState() + qrepFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: workflowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + WaitForCancellation: true, + } + ctx = workflow.WithChildOptions(ctx, qrepFlowOpts) + + qrepFlow := workflow.ExecuteChildWorkflow(ctx, peerflow.QRepFlowWorkflow, config, state) + model.FlowSignal.SignalChildWorkflow(ctx, qrepFlow, model.PauseSignal) + + signalChan := model.FlowSignal.GetSignalChannel(ctx) + unpause, _ := signalChan.Receive(ctx) + model.FlowSignal.SignalChildWorkflow(ctx, qrepFlow, unpause) + + ctx.Done().Receive(ctx, nil) + return ctx.Err() +} + +func (s PeerFlowE2ETestSuitePG) Test_Pause() { + numRows := 10 + + srcTable := "qrep_pause" + s.setupSourceTable(srcTable, numRows) + + dstTable := "qrep_pause_dst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.suffix, srcTable) + + config, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_columns_pg", + srcSchemaQualified, + dstSchemaQualified, + query, + e2e.GeneratePostgresPeer(e2e.PostgresPort), + "", + true, + "_PEERDB_SYNCED_AT", + ) require.NoError(s.t, err) + + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env.RegisterWorkflow(PauseTestWorkflow) + + go func() { + e2e.EnvWaitFor(s.t, env, time.Minute, "pausing", func() bool { + response, err := env.QueryWorkflow(shared.QRepFlowStateQuery) + if err != nil { + return false + } + var state peerflow.CDCFlowWorkflowState + err = response.Get(&state) + return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED + }) + e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) + e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { + response, err := env.QueryWorkflow(shared.QRepFlowStateQuery) + if err != nil { + return false + } + var state peerflow.CDCFlowWorkflowState + err = response.Get(&state) + return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING + }) + env.CancelWorkflow() + }() + env.ExecuteWorkflow(PauseTestWorkflow, "test-pause", config) + e2e.RequireEnvCanceled(s.t, env) }