Skip to content

Commit

Permalink
adding visibility into current mirror state
Browse files Browse the repository at this point in the history
breaks some protos and routes for clearer naming conventions, UI needs to be updated accordingly

removes SetupComplete and SnapshotComplete booleans in CDCFlowState in favour of a CurrentFlowState that expresses all states of a CDC workflow in a single variable.

rebase of PR: #657
  • Loading branch information
iskakaushik committed Jan 3, 2024
1 parent 7165db7 commit 942be71
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 69 deletions.
52 changes: 46 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,26 +436,55 @@ func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return nil, err
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
*currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
*currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
workflowID,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err)
}

return &protos.FlowStateChangeResponse{
Expand Down Expand Up @@ -710,3 +739,14 @@ func (h *FlowRequestHandler) DropPeer(
Ok: true,
}, nil
}

func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) {
q := "SELECT workflow_id FROM flows WHERE name ILIKE $1"
row := h.pool.QueryRow(ctx, q, flowJobName)
var workflowID string
if err := row.Scan(&workflowID); err != nil {
return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err)
}

return workflowID, nil
}
37 changes: 37 additions & 0 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -23,6 +24,18 @@ func (h *FlowRequestHandler) MirrorStatus(
}, nil
}

workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
Expand All @@ -36,6 +49,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_CdcStatus{
CdcStatus: cdcStatus,
},
CurrentFlowState: *currState,
}, nil
} else {
qrepStatus, err := h.QRepFlowStatus(ctx, req)
Expand All @@ -50,6 +64,7 @@ func (h *FlowRequestHandler) MirrorStatus(
Status: &protos.MirrorStatusResponse_QrepStatus{
QrepStatus: qrepStatus,
},
CurrentFlowState: *currState,
}, nil
}
}
Expand Down Expand Up @@ -272,3 +287,25 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob

return flowNames, nil
}

func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state *protos.FlowStatus
err = res.Get(&state)
if err != nil {
return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}

func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context,
workflowID string, state *protos.FlowStatus) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
}
return nil
}
15 changes: 13 additions & 2 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@ import (
)

const (
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
// Task Queues
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"

// Signals
CDCFlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
QRepFlowStateQuery = "q-qrep-flow-state"
FlowStatusQuery = "q-flow-status"

// Updates
FlowStatusUpdate = "u-flow-status"
)

const MirrorNameSearchAttribute = "MirrorName"
Expand Down
40 changes: 28 additions & 12 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

const (
CDCFlowStatusQuery = "q-cdc-flow-status"
maxSyncFlowsPerCDCFlow = 32
)

Expand All @@ -41,17 +40,15 @@ type CDCFlowWorkflowState struct {
NormalizeFlowStatuses []*model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal shared.CDCFlowSignal
// SetupComplete indicates whether the peer flow setup has completed.
SetupComplete bool
// SnapshotComplete indicates whether the initial snapshot workflow has completed.
SnapshotComplete bool
// Errors encountered during child sync flow executions.
SyncFlowErrors []string
// Errors encountered during child sync flow executions.
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping model.RelationMessageMapping
// current workflow state
CurrentFlowState *protos.FlowStatus
}

type SignalProps struct {
Expand All @@ -66,7 +63,6 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
SyncFlowStatuses: nil,
NormalizeFlowStatuses: nil,
ActiveSignal: shared.NoopSignal,
SetupComplete: false,
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
// WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me
Expand All @@ -76,6 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
RelationName: "protobuf_workaround",
},
},
CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(),
}
}

Expand Down Expand Up @@ -168,12 +165,24 @@ func CDCFlowWorkflowWithConfig(
limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow
}

// Support a Query for the current state of the peer flow.
err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) {
err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) {
return *state, nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err)
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err)
}
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return state.CurrentFlowState, nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
state.CurrentFlowState = status
return nil
})
if err != nil {
return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err)
}

mirrorNameSearch := map[string]interface{}{
Expand All @@ -184,7 +193,7 @@ func CDCFlowWorkflowWithConfig(
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
// also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
if !(state.SetupComplete && state.SnapshotComplete) {
if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() {
// if resync is true, alter the table name schema mapping to temporarily add
// a suffix to the table names.
if cfg.Resync {
Expand Down Expand Up @@ -214,7 +223,7 @@ func CDCFlowWorkflowWithConfig(
if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}
state.SetupComplete = true
state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum()

// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName)
Expand Down Expand Up @@ -276,7 +285,7 @@ func CDCFlowWorkflowWithConfig(
}
}

state.SnapshotComplete = true
state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
Expand Down Expand Up @@ -324,6 +333,7 @@ func CDCFlowWorkflowWithConfig(

if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum()
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
var signalVal shared.CDCFlowSignal

Expand All @@ -335,13 +345,19 @@ func CDCFlowWorkflowWithConfig(
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
w.logger.Info("peer flow has been shutdown")
state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum()
return state, nil
}

state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum()

// check if total sync flows have been completed
// since this happens immediately after we check for signals, the case of a signal being missed
// due to a new workflow starting is vanishingly low, but possible
Expand Down
23 changes: 20 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,26 @@ func QRepFlowWorkflow(
maxParallelWorkers = int(config.MaxParallelWorkers)
}

// register a query to get the number of partitions processed
err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) {
return state.NumPartitionsProcessed, nil
// Support a Query for the current state of the qrep flow.
err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) {
return state, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err)
}

// Support a Query for the current status of the arep flow.
err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) {
return &state.CurrentFlowState, nil
})
if err != nil {
return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err)
}

// Support an Update for the current status of the qrep flow.
err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error {
state.CurrentFlowState = *status
return nil
})
if err != nil {
return fmt.Errorf("failed to register query handler: %w", err)
Expand Down
48 changes: 11 additions & 37 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,6 @@ impl FlowGrpcClient {
Ok(workflow_id)
}

pub async fn shutdown_flow_job(
&mut self,
flow_job_name: &str,
workflow_details: WorkflowDetails,
) -> anyhow::Result<()> {
let shutdown_flow_req = pt::peerdb_route::ShutdownRequest {
flow_job_name: flow_job_name.to_string(),
workflow_id: workflow_details.workflow_id,
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer),
remove_flow_entry: false,
};
let response = self.client.shutdown_flow(shutdown_flow_req).await?;
let shutdown_response = response.into_inner();
if shutdown_response.ok {
Ok(())
} else {
Err(anyhow::anyhow!(format!(
"failed to shutdown flow job: {:?}",
shutdown_response.error_message
)))
}
}

pub async fn drop_peer(&mut self, peer_name: &str) -> anyhow::Result<()> {
let drop_peer_req = pt::peerdb_route::DropPeerRequest {
peer_name: String::from(peer_name),
Expand All @@ -129,25 +105,23 @@ impl FlowGrpcClient {
pub async fn flow_state_change(
&mut self,
flow_job_name: &str,
workflow_id: &str,
pause: bool,
workflow_details: WorkflowDetails,
state: pt::peerdb_flow::FlowStatus,
) -> anyhow::Result<()> {
let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest {
let state_change_req = pt::peerdb_route::FlowStateChangeRequest {
flow_job_name: flow_job_name.to_owned(),
workflow_id: workflow_id.to_owned(),
requested_flow_state: match pause {
true => pt::peerdb_route::FlowState::StatePaused.into(),
false => pt::peerdb_route::FlowState::StateRunning.into(),
},
requested_flow_state: state.into(),
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer),
};
let response = self.client.flow_state_change(pause_flow_req).await?;
let pause_response = response.into_inner();
if pause_response.ok {
let response = self.client.flow_state_change(state_change_req).await?;
let state_change_response = response.into_inner();
if state_change_response.ok {
Ok(())
} else {
Err(anyhow::anyhow!(format!(
"failed to pause/unpause flow job: {:?}",
pause_response.error_message
"failed to change the state of flow job {}: {:?}",
flow_job_name, state_change_response.error_message
)))
}
}
Expand Down
Loading

0 comments on commit 942be71

Please sign in to comment.