Skip to content

Commit

Permalink
cmd/api.go start scheduler not heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 20, 2024
1 parent 36ffb11 commit d306fd7
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(
func killExistingScheduleFlows(
ctx context.Context,
tc client.Client,
namespace string,
Expand All @@ -72,12 +72,12 @@ func killExistingHeartbeatFlows(
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'",
Query: "WorkflowType = 'GlobalScheduleManagerWorkflow' AND TaskQueue = '" + taskQueue + "'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
}
slog.Info("Requesting cancellation of pre-existing heartbeat flows")
slog.Info("Requesting cancellation of pre-existing scheduler flows")
for _, workflow := range listRes.Executions {
slog.Info("Cancelling workflow", slog.String("workflowId", workflow.Execution.WorkflowId))
err := tc.CancelWorkflow(ctx,
Expand Down Expand Up @@ -131,24 +131,24 @@ func APIMain(ctx context.Context, args *APIServerParams) error {

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
return fmt.Errorf("unable to kill existing scheduler flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
workflowID := fmt.Sprintf("scheduler-%s", uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
}

_, err = flowHandler.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.HeartbeatFlowWorkflow, // workflow function
ctx,
workflowOptions,
peerflow.GlobalScheduleManagerWorkflow,
)
if err != nil {
return fmt.Errorf("unable to start heartbeat workflow: %w", err)
return fmt.Errorf("unable to start scheduler workflow: %w", err)
}

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
Expand Down

0 comments on commit d306fd7

Please sign in to comment.