Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove multierror to fix error in error handling #833

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/google/uuid v1.4.0
github.com/grafana/pyroscope-go v1.0.4
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1
github.com/hashicorp/go-multierror v1.1.1
github.com/jackc/pglogrepl v0.0.0-20231111135425-1627ab1b5780
github.com/jackc/pgx/v5 v5.5.1
github.com/jmoiron/sqlx v1.3.5
Expand Down Expand Up @@ -124,7 +123,6 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
Expand Down
5 changes: 0 additions & 5 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8k
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
Expand Down
19 changes: 9 additions & 10 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -50,9 +49,9 @@ type CDCFlowWorkflowState struct {
// SnapshotComplete indicates whether the initial snapshot workflow has completed.
SnapshotComplete bool
// Errors encountered during child sync flow executions.
SyncFlowErrors error
SyncFlowErrors []string
// Errors encountered during child sync flow executions.
NormalizeFlowErrors error
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping *model.RelationMessageMapping
Expand All @@ -79,7 +78,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
}

// truncate the progress and other arrays to a max of 10 elements
func (s *CDCFlowWorkflowState) TruncateProgress() {
func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
if len(s.Progress) > 10 {
s.Progress = s.Progress[len(s.Progress)-10:]
}
Expand All @@ -91,12 +90,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress() {
}

if s.SyncFlowErrors != nil {
fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors)
logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors)
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
s.NormalizeFlowErrors = nil
}
}
Expand Down Expand Up @@ -373,7 +372,7 @@ func CDCFlowWorkflowWithConfig(
var childSyncFlowRes *model.SyncResponse
if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil {
w.logger.Error("failed to execute sync flow: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
if childSyncFlowRes != nil {
Expand Down Expand Up @@ -426,7 +425,7 @@ func CDCFlowWorkflowWithConfig(
var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
Expand All @@ -446,7 +445,7 @@ func CDCFlowWorkflowWithConfig(
var childNormalizeFlowRes *model.NormalizeResponse
if err := f.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
Expand All @@ -455,6 +454,6 @@ func CDCFlowWorkflowWithConfig(
batchSizeSelector.Select(ctx)
}

state.TruncateProgress()
state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}
Loading