diff --git a/flow/go.mod b/flow/go.mod index f0bb33c999..5dcb66e440 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -15,7 +15,6 @@ require ( github.com/google/uuid v1.4.0 github.com/grafana/pyroscope-go v1.0.4 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 - github.com/hashicorp/go-multierror v1.1.1 github.com/jackc/pglogrepl v0.0.0-20231111135425-1627ab1b5780 github.com/jackc/pgx/v5 v5.5.1 github.com/jmoiron/sqlx v1.3.5 @@ -124,7 +123,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect diff --git a/flow/go.sum b/flow/go.sum index 977cb09638..c2145c93e2 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -241,11 +241,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8k github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 4e0f760223..3e6cce1030 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -9,7 +9,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" - "github.com/hashicorp/go-multierror" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -50,9 +49,9 @@ type CDCFlowWorkflowState struct { // SnapshotComplete indicates whether the initial snapshot workflow has completed. SnapshotComplete bool // Errors encountered during child sync flow executions. - SyncFlowErrors error + SyncFlowErrors []string // Errors encountered during child sync flow executions. - NormalizeFlowErrors error + NormalizeFlowErrors []string // Global mapping of relation IDs to RelationMessages sent as a part of logical replication. // Needed to support schema changes. RelationMessageMapping *model.RelationMessageMapping @@ -79,7 +78,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState { } // truncate the progress and other arrays to a max of 10 elements -func (s *CDCFlowWorkflowState) TruncateProgress() { +func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { if len(s.Progress) > 10 { s.Progress = s.Progress[len(s.Progress)-10:] } @@ -91,12 +90,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress() { } if s.SyncFlowErrors != nil { - fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors) + logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors) s.SyncFlowErrors = nil } if s.NormalizeFlowErrors != nil { - fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors) + logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors) s.NormalizeFlowErrors = nil } } @@ -373,7 +372,7 @@ func CDCFlowWorkflowWithConfig( var childSyncFlowRes *model.SyncResponse if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil { w.logger.Error("failed to execute sync flow: ", err) - state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) + state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { @@ -426,7 +425,7 @@ func CDCFlowWorkflowWithConfig( var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { w.logger.Error("failed to execute schema update at source: ", err) - state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err) + state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { for i := range modifiedSrcTables { cfg.TableNameSchemaMapping[modifiedDstTables[i]] = @@ -446,7 +445,7 @@ func CDCFlowWorkflowWithConfig( var childNormalizeFlowRes *model.NormalizeResponse if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) } else { state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) } @@ -455,6 +454,6 @@ func CDCFlowWorkflowWithConfig( batchSizeSelector.Select(ctx) } - state.TruncateProgress() + state.TruncateProgress(w.logger) return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }