Skip to content

Commit

Permalink
Test qrep pause/unpause
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 27, 2024
1 parent 865d42a commit 6d714f2
Showing 1 changed file with 79 additions and 2 deletions.
81 changes: 79 additions & 2 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/workflow"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type PeerFlowE2ETestSuitePG struct {
Expand Down Expand Up @@ -290,10 +294,83 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
require.NoError(s.t, env.GetWorkflowError())

err = env.GetWorkflowError()
err = s.checkSyncedAt(dstSchemaQualified)
require.NoError(s.t, err)
}

err = s.checkSyncedAt(dstSchemaQualified)
func PauseTestWorkflow(ctx workflow.Context, workflowID string, config *protos.QRepConfig) error {
state := peerflow.NewQRepFlowState()
qrepFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: workflowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
WaitForCancellation: true,
}
ctx = workflow.WithChildOptions(ctx, qrepFlowOpts)

qrepFlow := workflow.ExecuteChildWorkflow(ctx, peerflow.QRepFlowWorkflow, config, state)
model.FlowSignal.SignalChildWorkflow(ctx, qrepFlow, model.PauseSignal)

signalChan := model.FlowSignal.GetSignalChannel(ctx)
unpause, _ := signalChan.Receive(ctx)
model.FlowSignal.SignalChildWorkflow(ctx, qrepFlow, unpause)

ctx.Done().Receive(ctx, nil)
return ctx.Err()
}

func (s PeerFlowE2ETestSuitePG) Test_Pause() {
numRows := 10

srcTable := "qrep_pause"
s.setupSourceTable(srcTable, numRows)

dstTable := "qrep_pause_dst"

srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable)
dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
s.suffix, srcTable)

config, err := e2e.CreateQRepWorkflowConfig(
"test_qrep_columns_pg",
srcSchemaQualified,
dstSchemaQualified,
query,
e2e.GeneratePostgresPeer(e2e.PostgresPort),
"",
true,
"_PEERDB_SYNCED_AT",
)
require.NoError(s.t, err)

env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
env.RegisterWorkflow(PauseTestWorkflow)

go func() {
e2e.EnvWaitFor(s.t, env, time.Minute, "pausing", func() bool {
response, err := env.QueryWorkflow(shared.QRepFlowStateQuery)
if err != nil {
return false
}
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED
})
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal)
e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool {
response, err := env.QueryWorkflow(shared.QRepFlowStateQuery)
if err != nil {
return false
}
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
return err != nil && state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING
})
env.CancelWorkflow()
}()
env.ExecuteWorkflow(PauseTestWorkflow, "test-pause", config)
e2e.RequireEnvCanceled(s.t, env)
}

0 comments on commit 6d714f2

Please sign in to comment.