Skip to content

Commit

Permalink
fixed bugs and updated temporal config
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 15, 2023
1 parent f28a138 commit 24821ef
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
9 changes: 5 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (h *FlowRequestHandler) FlowStateChange(
return nil, err
}
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING.Enum() {
*currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING.Enum())
if err != nil {
return nil, err
Expand All @@ -412,7 +412,7 @@ func (h *FlowRequestHandler) FlowStateChange(
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED.Enum() {
*currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
Expand All @@ -421,7 +421,7 @@ func (h *FlowRequestHandler) FlowStateChange(
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState == protos.FlowStatus_STATUS_RUNNING.Enum() || currState == protos.FlowStatus_STATUS_PAUSED.Enum()) {
(*currState == protos.FlowStatus_STATUS_RUNNING || *currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING.Enum())
if err != nil {
return nil, err
Expand All @@ -434,7 +434,8 @@ func (h *FlowRequestHandler) FlowStateChange(
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v", req.RequestedFlowState)
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal CDCFlow workflow: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (h *FlowRequestHandler) getCloneTableFlowNames(ctx context.Context, flowJob
}

func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName string) (string, error) {
q := "SELECT workflow_id FROM flows WHERE flow_name ILIKE $1"
q := "SELECT workflow_id FROM flows WHERE name ILIKE $1"
row := h.pool.QueryRow(ctx, q, flowJobName)
var workflowID string
if err := row.Scan(&workflowID); err != nil {
Expand Down
12 changes: 5 additions & 7 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,14 @@ impl FlowGrpcClient {
pub async fn flow_state_change(
&mut self,
flow_job_name: &str,
workflow_id: &str,
pause: bool,
workflow_details: WorkflowDetails,
state: pt::peerdb_flow::FlowStatus,
) -> anyhow::Result<()> {
let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest {
flow_job_name: flow_job_name.to_owned(),
workflow_id: workflow_id.to_owned(),
requested_flow_state: match pause {
true => pt::peerdb_route::FlowState::StatePaused.into(),
false => pt::peerdb_route::FlowState::StateRunning.into(),
},
requested_flow_state: state.into(),
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer)
};
let response = self.client.flow_state_change(pause_flow_req).await?;
let pause_response = response.into_inner();
Expand Down
22 changes: 17 additions & 5 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ impl NexusBackend {
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.shutdown_flow_job(flow_job_name, workflow_details)
.flow_state_change(
flow_job_name,
workflow_details,
pt::peerdb_flow::FlowStatus::StatusTerminated,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
Expand Down Expand Up @@ -735,11 +739,15 @@ impl NexusBackend {
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.flow_state_change(flow_job_name, &workflow_details.workflow_id, true)
.flow_state_change(
flow_job_name,
workflow_details,
pt::peerdb_flow::FlowStatus::StatusPaused,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to shutdown flow job: {:?}", err),
err_msg: format!("unable to pause flow job: {:?}", err),
}))
})?;
let drop_mirror_success = format!("PAUSE MIRROR {}", flow_job_name);
Expand Down Expand Up @@ -796,11 +804,15 @@ impl NexusBackend {
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.flow_state_change(flow_job_name, &workflow_details.workflow_id, false)
.flow_state_change(
flow_job_name,
workflow_details,
pt::peerdb_flow::FlowStatus::StatusRunning,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to shutdown flow job: {:?}", err),
err_msg: format!("unable to resume flow job: {:?}", err),
}))
})?;
let drop_mirror_success = format!("RESUME MIRROR {}", flow_job_name);
Expand Down
2 changes: 2 additions & 0 deletions temporal-dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ limit.maxIDLength:
system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}
frontend.enableUpdateWorkflowExecution:
- value: true # to enable external updates of workflow status [PAUSING, TERMINATING]

0 comments on commit 24821ef

Please sign in to comment.