diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 2a79baa8ca..643247b542 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -436,26 +436,55 @@ func (h *FlowRequestHandler) FlowStateChange( ctx context.Context, req *protos.FlowStateChangeRequest, ) (*protos.FlowStateChangeResponse, error) { - var err error - if req.RequestedFlowState == protos.FlowState_STATE_PAUSED { + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + currState, err := h.getWorkflowStatus(ctx, workflowID) + if err != nil { + return nil, err + } + if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && + *currState == protos.FlowStatus_STATUS_RUNNING { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum()) + if err != nil { + return nil, err + } err = h.temporalClient.SignalWorkflow( ctx, - req.WorkflowId, + workflowID, "", shared.CDCFlowSignalName, shared.PauseSignal, ) - } else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING { + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && + *currState == protos.FlowStatus_STATUS_PAUSED { err = h.temporalClient.SignalWorkflow( ctx, - req.WorkflowId, + workflowID, "", shared.CDCFlowSignalName, shared.NoopSignal, ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && + (*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum()) + if err != nil { + return nil, err + } + _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ + WorkflowId: workflowID, + FlowJobName: req.FlowJobName, + SourcePeer: req.SourcePeer, + DestinationPeer: req.DestinationPeer, + RemoveFlowEntry: false, + }) + } else { + return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", + req.RequestedFlowState, currState) } if err != nil { - return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err) + return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err) } return &protos.FlowStateChangeResponse{ @@ -710,3 +739,14 @@ func (h *FlowRequestHandler) DropPeer( Ok: true, }, nil } + +func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) { + q := "SELECT workflow_id FROM flows WHERE name ILIKE $1" + row := h.pool.QueryRow(ctx, q, flowJobName) + var workflowID string + if err := row.Scan(&workflowID); err != nil { + return "", fmt.Errorf("unable to get workflowID for flow job %s: %w", flowJobName, err) + } + + return workflowID, nil +} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index c87a9434df..332e186665 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -7,6 +7,7 @@ import ( "log/slog" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -23,6 +24,18 @@ func (h *FlowRequestHandler) MirrorStatus( }, nil } + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + + currState, err := h.getWorkflowStatus(ctx, workflowID) + if err != nil { + return &protos.MirrorStatusResponse{ + ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()), + }, nil + } + if cdcFlow { cdcStatus, err := h.CDCFlowStatus(ctx, req) if err != nil { @@ -36,6 +49,7 @@ func (h *FlowRequestHandler) MirrorStatus( Status: &protos.MirrorStatusResponse_CdcStatus{ CdcStatus: cdcStatus, }, + CurrentFlowState: *currState, }, nil } else { qrepStatus, err := h.QRepFlowStatus(ctx, req) @@ -50,6 +64,7 @@ func (h *FlowRequestHandler) MirrorStatus( Status: &protos.MirrorStatusResponse_QrepStatus{ QrepStatus: qrepStatus, }, + CurrentFlowState: *currState, }, nil } } @@ -272,3 +287,25 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob return flowNames, nil } + +func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (*protos.FlowStatus, error) { + res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) + if err != nil { + return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + var state *protos.FlowStatus + err = res.Get(&state) + if err != nil { + return nil, fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + return state, nil +} + +func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, + workflowID string, state *protos.FlowStatus) error { + _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) + if err != nil { + return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err) + } + return nil +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 40de25425d..07cfea307a 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -7,10 +7,21 @@ import ( ) const ( - peerFlowTaskQueue = "peer-flow-task-queue" - snapshotFlowTaskQueue = "snapshot-flow-task-queue" + // Task Queues + peerFlowTaskQueue = "peer-flow-task-queue" + snapshotFlowTaskQueue = "snapshot-flow-task-queue" + + // Signals CDCFlowSignalName = "peer-flow-signal" CDCDynamicPropertiesSignalName = "cdc-dynamic-properties" + + // Queries + CDCFlowStateQuery = "q-cdc-flow-status" + QRepFlowStateQuery = "q-qrep-flow-state" + FlowStatusQuery = "q-flow-status" + + // Updates + FlowStatusUpdate = "u-flow-status" ) const MirrorNameSearchAttribute = "MirrorName" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 0de2ef514b..e0ede98131 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -17,7 +17,6 @@ import ( ) const ( - CDCFlowStatusQuery = "q-cdc-flow-status" maxSyncFlowsPerCDCFlow = 32 ) @@ -41,10 +40,6 @@ type CDCFlowWorkflowState struct { NormalizeFlowStatuses []*model.NormalizeResponse // Current signalled state of the peer flow. ActiveSignal shared.CDCFlowSignal - // SetupComplete indicates whether the peer flow setup has completed. - SetupComplete bool - // SnapshotComplete indicates whether the initial snapshot workflow has completed. - SnapshotComplete bool // Errors encountered during child sync flow executions. SyncFlowErrors []string // Errors encountered during child sync flow executions. @@ -52,6 +47,8 @@ type CDCFlowWorkflowState struct { // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. RelationMessageMapping model.RelationMessageMapping + // current workflow state + CurrentFlowState *protos.FlowStatus } type SignalProps struct { @@ -66,7 +63,6 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { SyncFlowStatuses: nil, NormalizeFlowStatuses: nil, ActiveSignal: shared.NoopSignal, - SetupComplete: false, SyncFlowErrors: nil, NormalizeFlowErrors: nil, // WORKAROUND: empty maps are protobufed into nil maps for reasons beyond me @@ -76,6 +72,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { RelationName: "protobuf_workaround", }, }, + CurrentFlowState: protos.FlowStatus_STATUS_SETUP.Enum(), } } @@ -168,12 +165,24 @@ func CDCFlowWorkflowWithConfig( limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow } - // Support a Query for the current state of the peer flow. - err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) { + err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil }) if err != nil { - return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) + return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) + } + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + return state.CurrentFlowState, nil + }) + if err != nil { + return state, fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) + } + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + state.CurrentFlowState = status + return nil + }) + if err != nil { + return state, fmt.Errorf("failed to set `%s` update handler: %w", shared.FlowStatusUpdate, err) } mirrorNameSearch := map[string]interface{}{ @@ -184,7 +193,7 @@ func CDCFlowWorkflowWithConfig( // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. - if !(state.SetupComplete && state.SnapshotComplete) { + if state.CurrentFlowState != protos.FlowStatus_STATUS_RUNNING.Enum() { // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { @@ -214,7 +223,7 @@ func CDCFlowWorkflowWithConfig( if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) } - state.SetupComplete = true + state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT.Enum() // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) @@ -276,7 +285,7 @@ func CDCFlowWorkflowWithConfig( } } - state.SnapshotComplete = true + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() state.Progress = append(state.Progress, "executed setup flow and snapshot flow") // if initial_copy_only is opted for, we end the flow here. @@ -324,6 +333,7 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() + state.CurrentFlowState = protos.FlowStatus_STATUS_PAUSED.Enum() signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -335,13 +345,19 @@ func CDCFlowWorkflowWithConfig( state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) } } + + w.logger.Info("mirror has been resumed after ", time.Since(startTime)) } + // check if the peer flow has been shutdown if state.ActiveSignal == shared.ShutdownSignal { w.logger.Info("peer flow has been shutdown") + state.CurrentFlowState = protos.FlowStatus_STATUS_TERMINATED.Enum() return state, nil } + state.CurrentFlowState = protos.FlowStatus_STATUS_RUNNING.Enum() + // check if total sync flows have been completed // since this happens immediately after we check for signals, the case of a signal being missed // due to a new workflow starting is vanishingly low, but possible diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index d149ce2e62..0b78a6dc43 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -396,9 +396,26 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } - // register a query to get the number of partitions processed - err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { - return state.NumPartitionsProcessed, nil + // Support a Query for the current state of the qrep flow. + err := workflow.SetQueryHandler(ctx, shared.QRepFlowStateQuery, func() (*protos.QRepFlowState, error) { + return state, nil + }) + if err != nil { + return fmt.Errorf("failed to set `%s` query handler: %w", shared.QRepFlowStateQuery, err) + } + + // Support a Query for the current status of the arep flow. + err = workflow.SetQueryHandler(ctx, shared.FlowStatusQuery, func() (*protos.FlowStatus, error) { + return &state.CurrentFlowState, nil + }) + if err != nil { + return fmt.Errorf("failed to set `%s` query handler: %w", shared.FlowStatusQuery, err) + } + + // Support an Update for the current status of the qrep flow. + err = workflow.SetUpdateHandler(ctx, shared.FlowStatusUpdate, func(status *protos.FlowStatus) error { + state.CurrentFlowState = *status + return nil }) if err != nil { return fmt.Errorf("failed to register query handler: %w", err) diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 7a051e3d23..68a41d0d1d 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -86,30 +86,6 @@ impl FlowGrpcClient { Ok(workflow_id) } - pub async fn shutdown_flow_job( - &mut self, - flow_job_name: &str, - workflow_details: WorkflowDetails, - ) -> anyhow::Result<()> { - let shutdown_flow_req = pt::peerdb_route::ShutdownRequest { - flow_job_name: flow_job_name.to_string(), - workflow_id: workflow_details.workflow_id, - source_peer: Some(workflow_details.source_peer), - destination_peer: Some(workflow_details.destination_peer), - remove_flow_entry: false, - }; - let response = self.client.shutdown_flow(shutdown_flow_req).await?; - let shutdown_response = response.into_inner(); - if shutdown_response.ok { - Ok(()) - } else { - Err(anyhow::anyhow!(format!( - "failed to shutdown flow job: {:?}", - shutdown_response.error_message - ))) - } - } - pub async fn drop_peer(&mut self, peer_name: &str) -> anyhow::Result<()> { let drop_peer_req = pt::peerdb_route::DropPeerRequest { peer_name: String::from(peer_name), @@ -129,25 +105,23 @@ impl FlowGrpcClient { pub async fn flow_state_change( &mut self, flow_job_name: &str, - workflow_id: &str, - pause: bool, + workflow_details: WorkflowDetails, + state: pt::peerdb_flow::FlowStatus, ) -> anyhow::Result<()> { - let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest { + let state_change_req = pt::peerdb_route::FlowStateChangeRequest { flow_job_name: flow_job_name.to_owned(), - workflow_id: workflow_id.to_owned(), - requested_flow_state: match pause { - true => pt::peerdb_route::FlowState::StatePaused.into(), - false => pt::peerdb_route::FlowState::StateRunning.into(), - }, + requested_flow_state: state.into(), + source_peer: Some(workflow_details.source_peer), + destination_peer: Some(workflow_details.destination_peer), }; - let response = self.client.flow_state_change(pause_flow_req).await?; - let pause_response = response.into_inner(); - if pause_response.ok { + let response = self.client.flow_state_change(state_change_req).await?; + let state_change_response = response.into_inner(); + if state_change_response.ok { Ok(()) } else { Err(anyhow::anyhow!(format!( - "failed to pause/unpause flow job: {:?}", - pause_response.error_message + "failed to change the state of flow job {}: {:?}", + flow_job_name, state_change_response.error_message ))) } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 9aa4dcb9a4..bb2219512e 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -273,7 +273,11 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .shutdown_flow_job(flow_job_name, workflow_details) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusTerminated, + ) .await .map_err(|err| { PgWireError::ApiError( @@ -693,11 +697,15 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .flow_state_change(flow_job_name, &workflow_details.workflow_id, true) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusPaused, + ) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to shutdown flow job: {:?}", err).into(), + format!("unable to pause flow job: {:?}", err).into(), ) })?; let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name); @@ -752,11 +760,15 @@ impl NexusBackend { if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .flow_state_change(flow_job_name, &workflow_details.workflow_id, false) + .flow_state_change( + flow_job_name, + workflow_details, + pt::peerdb_flow::FlowStatus::StatusRunning, + ) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to shutdown flow job: {:?}", err).into(), + format!("unable to resume flow job: {:?}", err).into(), ) })?; let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name); diff --git a/protos/flow.proto b/protos/flow.proto index d6e4ab76e8..2a8f08b6b9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -382,6 +382,7 @@ message QRepFlowState { uint64 num_partitions_processed = 2; bool needs_resync = 3; bool disable_wait_for_new_rows = 4; + FlowStatus current_flow_state = 5; } message PeerDBColumns { @@ -393,4 +394,28 @@ message PeerDBColumns { message GetOpenConnectionsForUserResult { string user_name = 1; int64 current_open_connections = 2; -} \ No newline at end of file +} + +// UI reads current workflow status and also requests status changes using same enum +// UI can request STATUS_PAUSED, STATUS_RUNNING and STATUS_TERMINATED +// STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED +// STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED +// UI can read everything except STATUS_UNKNOWN +enum FlowStatus { + // should never be read by UI, bail + STATUS_UNKNOWN = 0; + // enable pause and terminate buttons + STATUS_RUNNING = 1; + // pause button becomes resume button, terminate button should still be enabled + STATUS_PAUSED = 2; + // neither button should be enabled + STATUS_PAUSING = 3; + // neither button should be enabled, not reachable in QRep mirrors + STATUS_SETUP = 4; + // neither button should be enabled, not reachable in QRep mirrors + STATUS_SNAPSHOT = 5; + // neither button should be enabled + STATUS_TERMINATING = 6; + // neither button should be enabled + STATUS_TERMINATED = 7; +} diff --git a/protos/route.proto b/protos/route.proto index 90ee7698d4..126c97c154 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -179,6 +179,7 @@ message MirrorStatusResponse { CDCMirrorStatus cdc_status = 3; } string error_message = 4; + peerdb_flow.FlowStatus current_flow_state = 5; } // in the future, consider moving DropFlow to this and reduce route surface @@ -189,9 +190,10 @@ enum FlowState { } message FlowStateChangeRequest { - string workflow_id = 1; - string flow_job_name = 2; - FlowState requested_flow_state = 3; + string flow_job_name = 1; + peerdb_flow.FlowStatus requested_flow_state = 2; + peerdb_peers.Peer source_peer = 3; + peerdb_peers.Peer destination_peer = 4; } message FlowStateChangeResponse { diff --git a/temporal-dynamicconfig/development-sql.yaml b/temporal-dynamicconfig/development-sql.yaml index 8862dfad72..3ec67ddf69 100755 --- a/temporal-dynamicconfig/development-sql.yaml +++ b/temporal-dynamicconfig/development-sql.yaml @@ -4,3 +4,5 @@ limit.maxIDLength: system.forceSearchAttributesCacheRefreshOnRead: - value: true # Dev setup only. Please don't turn this on in production. constraints: {} +frontend.enableUpdateWorkflowExecution: + - value: true # to enable external updates of workflow status [PAUSING, TERMINATING] \ No newline at end of file