Skip to content

Commit

Permalink
logs for shutdown flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 12, 2023
1 parent 21cd6ec commit b63f88d
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
logs := slog.Group("shutdown-log",
slog.String("flowName", req.FlowJobName),
slog.String("workflowId", req.WorkflowId),
)
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand All @@ -325,6 +329,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
shared.ShutdownSignal,
)
if err != nil {
slog.Error("unable to signal PeerFlow workflow",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to signal PeerFlow workflow: %v", err),
Expand All @@ -333,6 +341,10 @@ func (h *FlowRequestHandler) ShutdownFlow(

err = h.waitForWorkflowClose(ctx, req.WorkflowId)
if err != nil {
slog.Error("unable to wait for PeerFlow workflow to close",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for PeerFlow workflow to close: %v", err),
Expand All @@ -354,6 +366,9 @@ func (h *FlowRequestHandler) ShutdownFlow(
req, // workflow input
)
if err != nil {
slog.Error("unable to start DropFlow workflow",
logs,
slog.Any("error", err))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
Expand All @@ -371,6 +386,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
Expand All @@ -379,6 +398,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
slog.Error("unable to wait for DropFlow workflow to close",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
Expand All @@ -389,6 +412,10 @@ func (h *FlowRequestHandler) ShutdownFlow(
if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
slog.Error("unable to remove flow job entry",
slog.String("flowName", req.FlowJobName),
slog.Any("error", err),
slog.String("workflowId", req.WorkflowId))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: err.Error(),
Expand Down

0 comments on commit b63f88d

Please sign in to comment.