diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 65f9886f51..19c818b91e 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -280,7 +280,8 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s } func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context, - workflowID string, state *protos.FlowStatus) error { + workflowID string, state *protos.FlowStatus, +) error { _, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state) if err != nil { return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err) diff --git a/flow/geo/geo.go b/flow/geo/geo.go index a7f87e0174..d0479da3eb 100644 --- a/flow/geo/geo.go +++ b/flow/geo/geo.go @@ -1,4 +1,3 @@ -//nolint:all package geo import ( diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index addea52666..91a5a4dbff 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -234,7 +234,9 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } logger.Info("cloning tables in parallel: ", numTablesInParallel) - s.cloneTables(ctx, slotInfo, numTablesInParallel) + if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { + return fmt.Errorf("failed to clone tables: %w", err) + } if err := s.closeSlotKeepAlive(sessionCtx); err != nil { return fmt.Errorf("failed to close slot keep alive: %w", err) @@ -276,7 +278,9 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon SlotName: "peerdb_initial_copy_only", SnapshotName: "", // empty snapshot name indicates that we should not use a snapshot } - se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)) + if err := se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)); err != nil { + return fmt.Errorf("failed to clone tables: %w", err) + } return nil }