Skip to content

Commit

Permalink
propagate errors from cloneTables
Browse files Browse the repository at this point in the history
also format mirror_status.go & remove nolint:all from geo.go
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 09b7b35 commit 02f4664
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s
}

func (h *FlowRequestHandler) updateWorkflowStatus(ctx context.Context,
workflowID string, state *protos.FlowStatus) error {
workflowID string, state *protos.FlowStatus,
) error {
_, err := h.temporalClient.UpdateWorkflow(ctx, workflowID, "", shared.FlowStatusUpdate, state)
if err != nil {
return fmt.Errorf("failed to update state in workflow with ID %s: %w", workflowID, err)
Expand Down
1 change: 0 additions & 1 deletion flow/geo/geo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:all
package geo

import (
Expand Down
8 changes: 6 additions & 2 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
s.cloneTables(ctx, slotInfo, numTablesInParallel)
if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone tables: %w", err)
}

if err := s.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
Expand Down Expand Up @@ -276,7 +278,9 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
SlotName: "peerdb_initial_copy_only",
SnapshotName: "", // empty snapshot name indicates that we should not use a snapshot
}
se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel))
if err := se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)); err != nil {
return fmt.Errorf("failed to clone tables: %w", err)
}
return nil
}

Expand Down

0 comments on commit 02f4664

Please sign in to comment.