From 942be7122c61b85159b174adba2da8951bb470e0 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 3 Jan 2024 09:56:11 -0500 Subject: [PATCH] adding visibility into current mirror state breaks some protos and routes for clearer naming conventions, UI needs to be updated accordingly removes SetupComplete and SnapshotComplete booleans in CDCFlowState in favour of a CurrentFlowState that expresses all states of a CDC workflow in a single variable. rebase of PR: https://github.com/PeerDB-io/peerdb/pull/657 --- flow/cmd/handler.go | 52 ++++++++++++++++++--- flow/cmd/mirror_status.go | 37 +++++++++++++++ flow/shared/constants.go | 15 +++++- flow/workflows/cdc_flow.go | 40 +++++++++++----- flow/workflows/qrep_flow.go | 23 +++++++-- nexus/flow-rs/src/grpc.rs | 48 +++++-------------- nexus/server/src/main.rs | 22 +++++++-- protos/flow.proto | 27 ++++++++++- protos/route.proto | 8 ++-- temporal-dynamicconfig/development-sql.yaml | 2 + 10 files changed, 205 insertions(+), 69 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 2a79baa8ca..643247b542 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -436,26 +436,55 @@ func (h *FlowRequestHandler) FlowStateChange( ctx context.Context, req *protos.FlowStateChangeRequest, ) (*protos.FlowStateChangeResponse, error) { - var err error - if req.RequestedFlowState == protos.FlowState_STATE_PAUSED { + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + currState, err := h.getWorkflowStatus(ctx, workflowID) + if err != nil { + return nil, err + } + if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && + *currState == protos.FlowStatus_STATUS_RUNNING { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum()) + if err != nil { + return nil, err + } err = h.temporalClient.SignalWorkflow( ctx, - req.WorkflowId, + workflowID, "", shared.CDCFlowSignalName, shared.PauseSignal, ) - } else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING { + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && + *currState == protos.FlowStatus_STATUS_PAUSED { err = h.temporalClient.SignalWorkflow( ctx, - req.WorkflowId, + workflowID, "", shared.CDCFlowSignalName, shared.NoopSignal, ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && + (*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum()) + if err != nil { + return nil, err + } + _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ + WorkflowId: workflowID, + FlowJobName: req.FlowJobName, + SourcePeer: req.SourcePeer, + DestinationPeer: req.DestinationPeer, + RemoveFlowEntry: false, + }) + } else { + return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", + req.RequestedFlowState, currState) } if err != nil { - return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err) + return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err) } return &protos.FlowStateChangeResponse{ @@ -710,3 +739,14 @@ func (h *FlowRequestHandler) DropPeer( Ok: true, }, nil } + +func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) { + q := "SELECT workflow_id FROM flows WHERE name ILIKE $1" + row := h.pool.QueryRow(ctx, q, flowJobName) + var workflowID string + if err := row.Scan(&workflowID); err != nil { + return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err) + } + + return workflowID, nil +} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index c87a9434df..332e186665 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -7,6 +7,7 @@ import ( "log/slog" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -23,6 +24,18 @@ func (h *FlowRequestHandler) MirrorStatus( }, nil } + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + + currState, err := h.getWorkflowStatus(ctx, workflowID) + if err != nil { + return &protos.MirrorStatusResponse{ + ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()), + }, nil + } + if cdcFlow { cdcStatus, err := h.CDCFlowStatus(ctx, req) if err != nil { @@ -36,6 +49,7 @@ func (h *FlowRequestHandler) MirrorStatus( Status: &protos.MirrorStatusResponse_CdcStatus{ CdcStatus: cdcStatus, }, + CurrentFlowState: *currState, }, nil } else { qrepStatus, err := h.QRepFlowStatus(ctx, req) @@ -50,6 +64,7 @@ func (h *FlowRequestHandler) MirrorStatus( Status: &protos.MirrorStatusResponse_QrepStatus{ QrepStatus: qrepStatus, }, + CurrentFlowState: *currState, }, nil } } @@ -272,3 +287,25 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob return flowNames, nil } + +func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) { + res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) + if err != nil { + return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + var state *protos.FlowStatus + err = res.Get(&state) + if err != nil { + return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + return state, nil +} + +func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, + workflowID string, state *protos.FlowStatus) error { + _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) + if err != nil { + return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err) + } + return nil +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 40de25425d..07cfea307a 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -7,10 +7,21 @@ import ( ) const ( - peerFlowTaskQueue = "peer-flow-task-queue" - snapshotFlowTaskQueue = "snapshot-flow-task-queue" + // Task Queues + peerFlowTaskQueue = "peer-flow-task-queue" + snapshotFlowTaskQueue = "snapshot-flow-task-queue" + + // Signals CDCFlowSignalName = "peer-flow-signal" CDCDynamicPropertiesSignalName = "cdc-dynamic-properties" + + // Queries + CDCFlowStateQuery = "q-cdc-flow-status" + QRepFlowStateQuery = "q-qrep-flow-state" + FlowStatusQuery = "q-flow-status" + + // Updates + FlowStatusUpdate = "u-flow-status" ) const MirrorNameSearchAttribute = "MirrorName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0de2ef514b..e0ede98131 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -17,7 +17,6 @@ import ( ) const ( - CDCFlowStatusQuery = "q-cdc-flow-status" maxSyncFlowsPerCDCFlow = 32 ) @@ -41,10 +40,6 @@ type CDCFlowWorkflowState struct { NormalizeFlowStatuses []*model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal shared.CDCFlowSignal - // SetupComplete indicates whether the peer flow setup has completed. - SetupComplete bool - // SnapshotComplete indicates whether the initial snapshot workflow has completed. - SnapshotComplete bool // Errors encountered during child sync flow executions. SyncFlowErrors []string // Errors encountered during child sync flow executions. @@ -52,6 +47,8 @@ type CDCFlowWorkflowState struct { // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping + // current workflow state + CurrentFlowState *protos.FlowStatus } type SignalProps struct { @@ -66,7 +63,6 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { SyncFlowStatuses: nil, NormalizeFlowStatuses: nil, ActiveSignal: shared.NoopSignal, - SetupComplete: false, SyncFlowErrors: nil, NormalizeFlowErrors: nil, // WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me @@ -76,6 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, + CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(), } } @@ -168,12 +165,24 @@ func CDCFlowWorkflowWithConfig( limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow } - // Support a Query for the current state of the peer flow. - err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) { + err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil }) if err != nil { - return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) + return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) + } + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + return state.CurrentFlowState, nil + }) + if err != nil { + return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) + } + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + state.CurrentFlowState = status + return nil + }) + if err != nil { + return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err) } mirrorNameSearch := map[string]interface{}{ @@ -184,7 +193,7 @@ func CDCFlowWorkflowWithConfig( // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. - if !(state.SetupComplete && state.SnapshotComplete) { + if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -214,7 +223,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.SetupComplete = true + state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum() // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -276,7 +285,7 @@ func CDCFlowWorkflowWithConfig( } } - state.SnapshotComplete = true + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. @@ -324,6 +333,7 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() + state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum() signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -335,13 +345,19 @@ func CDCFlowWorkflowWithConfig( state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) } } + + w.logger.Info("mirror has been resumed after ", time.Since(startTime)) } + // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") + state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum() return state, nil } + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed // due to a new workflow starting is vanishingly low, but possible diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index d149ce2e62..0b78a6dc43 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -396,9 +396,26 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } - // register a query to get the number of partitions processed - err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { - return state.NumPartitionsProcessed, nil + // Support a Query for the current state of the qrep flow. + err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) { + return state, nil + }) + if err != nil { + return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err) + } + + // Support a Query for the current status of the arep flow. + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + return &state.CurrentFlowState, nil + }) + if err != nil { + return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) + } + + // Support an Update for the current status of the qrep flow. + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + state.CurrentFlowState = *status + return nil }) if err != nil { return fmt.Errorf("failed to register query handler: %w", err) diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 7a051e3d23..68a41d0d1d 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -86,30 +86,6 @@ impl FlowGrpcClient { Ok(workflow_id) } - pub async fn shutdown_flow_job( - &mut self, - flow_job_name: &str, - workflow_details: WorkflowDetails, - ) -> anyhow::Result<()> { - let shutdown_flow_req = pt::peerdb_route::ShutdownRequest { - flow_job_name: flow_job_name.to_string(), - workflow_id: workflow_details.workflow_id, - source_peer: Some(workflow_details.source_peer), - destination_peer: Some(workflow_details.destination_peer), - remove_flow_entry: false, - }; - let response = self.client.shutdown_flow(shutdown_flow_req).await?; - let shutdown_response = response.into_inner(); - if shutdown_response.ok { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "failed to shutdown flow job: {:?}", - shutdown_response.error_message - ))) - } - } - pub async fn drop_peer(&mut self, peer_name: &str) -> anyhow::Result<()> { let drop_peer_req = pt::peerdb_route::DropPeerRequest { peer_name: String::from(peer_name), @@ -129,25 +105,23 @@ impl FlowGrpcClient { pub async fn flow_state_change( &mut self, flow_job_name: &str, - workflow_id: &str, - pause: bool, + workflow_details: WorkflowDetails, + state: pt::peerdb_flow::FlowStatus, ) -> anyhow::Result<()> { - let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest { + let state_change_req = pt::peerdb_route::FlowStateChangeRequest { flow_job_name: flow_job_name.to_owned(), - workflow_id: workflow_id.to_owned(), - requested_flow_state: match pause { - true => pt::peerdb_route::FlowState::StatePaused.into(), - false => pt::peerdb_route::FlowState::StateRunning.into(), - }, + requested_flow_state: state.into(), + source_peer: Some(workflow_details.source_peer), + destination_peer: Some(workflow_details.destination_peer), }; - let response = self.client.flow_state_change(pause_flow_req).await?; - let pause_response = response.into_inner(); - if pause_response.ok { + let response = self.client.flow_state_change(state_change_req).await?; + let state_change_response = response.into_inner(); + if state_change_response.ok { Ok(()) } else { Err(anyhow::anyhow!(format!( - "failed to pause/unpause flow job: {:?}", - pause_response.error_message + "failed to change the state of flow job {}: {:?}", + flow_job_name, state_change_response.error_message ))) } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 9aa4dcb9a4..bb2219512e 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -273,7 +273,11 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .shutdown_flow_job(flow_job_name, workflow_details) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusTerminated, + ) .await .map_err(|err| { PgWireError::ApiError( @@ -693,11 +697,15 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .flow_state_change(flow_job_name, &workflow_details.workflow_id, true) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusPaused, + ) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to shutdown flow job: {:?}", err).into(), + format!("unable to pause flow job: {:?}", err).into(), ) })?; let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name); @@ -752,11 +760,15 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .flow_state_change(flow_job_name, &workflow_details.workflow_id, false) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusRunning, + ) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to shutdown flow job: {:?}", err).into(), + format!("unable to resume flow job: {:?}", err).into(), ) })?; let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); diff --git a/protos/flow.proto b/protos/flow.proto index d6e4ab76e8..2a8f08b6b9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -382,6 +382,7 @@ message QRepFlowState { uint64 num_partitions_processed = 2; bool needs_resync = 3; bool disable_wait_for_new_rows = 4; + FlowStatus current_flow_state = 5; } message PeerDBColumns { @@ -393,4 +394,28 @@ message PeerDBColumns { message GetOpenConnectionsForUserResult { string user_name = 1; int64 current_open_connections = 2; -} \ No newline at end of file +} + +// UI reads current workflow status and also requests status changes using same enum +// UI can request STATUS_PAUSED, STATUS_RUNNING and STATUS_TERMINATED +// STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED +// STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED +// UI can read everything except STATUS_UNKNOWN +enum FlowStatus { + // should never be read by UI, bail + STATUS_UNKNOWN = 0; + // enable pause and terminate buttons + STATUS_RUNNING = 1; + // pause button becomes resume button, terminate button should still be enabled + STATUS_PAUSED = 2; + // neither button should be enabled + STATUS_PAUSING = 3; + // neither button should be enabled, not reachable in QRep mirrors + STATUS_SETUP = 4; + // neither button should be enabled, not reachable in QRep mirrors + STATUS_SNAPSHOT = 5; + // neither button should be enabled + STATUS_TERMINATING = 6; + // neither button should be enabled + STATUS_TERMINATED = 7; +} diff --git a/protos/route.proto b/protos/route.proto index 90ee7698d4..126c97c154 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -179,6 +179,7 @@ message MirrorStatusResponse { CDCMirrorStatus cdc_status = 3; } string error_message = 4; + peerdb_flow.FlowStatus current_flow_state = 5; } // in the future, consider moving DropFlow to this and reduce route surface @@ -189,9 +190,10 @@ enum FlowState { } message FlowStateChangeRequest { - string workflow_id = 1; - string flow_job_name = 2; - FlowState requested_flow_state = 3; + string flow_job_name = 1; + peerdb_flow.FlowStatus requested_flow_state = 2; + peerdb_peers.Peer source_peer = 3; + peerdb_peers.Peer destination_peer = 4; } message FlowStateChangeResponse { diff --git a/temporal-dynamicconfig/development-sql.yaml b/temporal-dynamicconfig/development-sql.yaml index 8862dfad72..3ec67ddf69 100755 --- a/temporal-dynamicconfig/development-sql.yaml +++ b/temporal-dynamicconfig/development-sql.yaml @@ -4,3 +4,5 @@ limit.maxIDLength: system.forceSearchAttributesCacheRefreshOnRead: - value: true # Dev setup only. Please don't turn this on in production. constraints: {} +frontend.enableUpdateWorkflowExecution: + - value: true # to enable external updates of workflow status [PAUSING, TERMINATING] \ No newline at end of file