Skip to content

Commit

Permalink
adding visibility into current mirror state (#971)
Browse files Browse the repository at this point in the history
breaks some protos and routes for clearer naming conventions, UI needs
to be updated accordingly

removes SetupComplete and SnapshotComplete booleans in CDCFlowState in
favour of a CurrentFlowState that expresses all states of a CDC workflow
in a single variable.

rebase of PR: #657

---------

Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
iskakaushik and heavycrystal authored Jan 3, 2024
1 parent 4bc7879 commit c18b45e
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 80 deletions.
52 changes: 46 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,26 +436,55 @@ func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return nil, err
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
*currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
*currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Expand Down Expand Up @@ -710,3 +739,14 @@ func (h *FlowRequestHandler) DropPeer(
Ok: true,
}, nil
}

func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) {
q := "SELECT workflow_id FROM flows WHERE name ILIKE $1"
row := h.pool.QueryRow(ctx, q, flowJobName)
var workflowID string
if err := row.Scan(&workflowID); err != nil {
return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err)
}

return workflowID, nil
}
37 changes: 37 additions & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -23,6 +24,18 @@ func (h *FlowRequestHandler) MirrorStatus(
}, nil
}

workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
Expand All @@ -36,6 +49,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: *currState,
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
Expand All @@ -50,6 +64,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: *currState,
}, nil
}
}
Expand Down Expand Up @@ -272,3 +287,25 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob

return flowNames, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
err = res.Get(&state)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}

func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context,
workflowID string, state *protos.FlowStatus) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
}
return nil
}
1 change: 0 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,6 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
normalizeStatements := c.generateNormalizeStatements(destinationTableName, unchangedToastColsMap[destinationTableName],
rawTableIdentifier, supportsMerge, &peerdbCols)
fmt.Println(normalizeStatements)
for _, normalizeStatement := range normalizeStatements {
mergeStatementsBatch.Queue(normalizeStatement, batchIDs.NormalizeBatchID, batchIDs.SyncBatchID, destinationTableName).Exec(
func(ct pgconn.CommandTag) error {
Expand Down
7 changes: 4 additions & 3 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
Expand Down Expand Up @@ -65,7 +66,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
time.Sleep(5 * time.Second)
for {
response, err := env.QueryWorkflow(
peerflow.CDCFlowStatusQuery,
shared.CDCFlowStateQuery,
connectionGen.FlowJobName,
)
if err == nil {
Expand All @@ -75,7 +76,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
slog.Error(err.Error())
}

if state.SnapshotComplete {
if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING {
break
}
} else {
Expand All @@ -95,7 +96,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
time.Sleep(5 * time.Second)
for {
response, err := env.QueryWorkflow(
peerflow.CDCFlowStatusQuery,
shared.CDCFlowStateQuery,
connectionGen.FlowJobName,
)
if err == nil {
Expand Down
15 changes: 13 additions & 2 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@ import (
)

const (
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
// Task Queues
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"

// Signals
CDCFlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
QRepFlowStateQuery = "q-qrep-flow-state"
FlowStatusQuery = "q-flow-status"

// Updates
FlowStatusUpdate = "u-flow-status"
)

const MirrorNameSearchAttribute = "MirrorName"
Expand Down
40 changes: 28 additions & 12 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

const (
CDCFlowStatusQuery = "q-cdc-flow-status"
maxSyncFlowsPerCDCFlow = 32
)

Expand All @@ -41,17 +40,15 @@ type CDCFlowWorkflowState struct {
NormalizeFlowStatuses []*model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal shared.CDCFlowSignal
// SetupComplete indicates whether the peer flow setup has completed.
SetupComplete bool
// SnapshotComplete indicates whether the initial snapshot workflow has completed.
SnapshotComplete bool
// Errors encountered during child sync flow executions.
SyncFlowErrors []string
// Errors encountered during child sync flow executions.
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping model.RelationMessageMapping
// current workflow state
CurrentFlowState *protos.FlowStatus
}

type SignalProps struct {
Expand All @@ -66,7 +63,6 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
SyncFlowStatuses: nil,
NormalizeFlowStatuses: nil,
ActiveSignal: shared.NoopSignal,
SetupComplete: false,
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
// WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me
Expand All @@ -76,6 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
RelationName: "protobuf_workaround",
},
},
CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(),
}
}

Expand Down Expand Up @@ -168,12 +165,24 @@ func CDCFlowWorkflowWithConfig(
limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow
}

// Support a Query for the current state of the peer flow.
err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) {
err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) {
return *state, nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err)
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err)
}
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return state.CurrentFlowState, nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
state.CurrentFlowState = status
return nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err)
}

mirrorNameSearch := map[string]interface{}{
Expand All @@ -184,7 +193,7 @@ func CDCFlowWorkflowWithConfig(
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
// also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
if !(state.SetupComplete && state.SnapshotComplete) {
if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() {
// if resync is true, alter the table name schema mapping to temporarily add
// a suffix to the table names.
if cfg.Resync {
Expand Down Expand Up @@ -214,7 +223,7 @@ func CDCFlowWorkflowWithConfig(
if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}
state.SetupComplete = true
state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum()

// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName)
Expand Down Expand Up @@ -276,7 +285,7 @@ func CDCFlowWorkflowWithConfig(
}
}

state.SnapshotComplete = true
state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
Expand Down Expand Up @@ -324,6 +333,7 @@ func CDCFlowWorkflowWithConfig(

if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum()
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
var signalVal shared.CDCFlowSignal

Expand All @@ -335,13 +345,19 @@ func CDCFlowWorkflowWithConfig(
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
w.logger.Info("peer flow has been shutdown")
state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum()
return state, nil
}

state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()

// check if total sync flows have been completed
// since this happens immediately after we check for signals, the case of a signal being missed
// due to a new workflow starting is vanishingly low, but possible
Expand Down
23 changes: 20 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,26 @@ func QRepFlowWorkflow(
maxParallelWorkers = int(config.MaxParallelWorkers)
}

// register a query to get the number of partitions processed
err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) {
return state.NumPartitionsProcessed, nil
// Support a Query for the current state of the qrep flow.
err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) {
return state, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err)
}

// Support a Query for the current status of the arep flow.
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return &state.CurrentFlowState, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}

// Support an Update for the current status of the qrep flow.
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
state.CurrentFlowState = *status
return nil
})
if err != nil {
return fmt.Errorf("failed to register query handler: %w", err)
Expand Down
Loading

0 comments on commit c18b45e

Please sign in to comment.