Skip to content

Commit

Permalink
Remove multierror to fix error in error handling (#833)
Browse files Browse the repository at this point in the history
multierror struct has a formatting closure, making it unserializable
Thus, multierror cannot be stored in temporal state

Replace with `[]string`
  • Loading branch information
serprex authored Dec 16, 2023
1 parent 4d0d18a commit c196e6c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 17 deletions.
2 changes: 0 additions & 2 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/google/uuid v1.4.0
github.com/grafana/pyroscope-go v1.0.4
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1
github.com/hashicorp/go-multierror v1.1.1
github.com/jackc/pglogrepl v0.0.0-20231111135425-1627ab1b5780
github.com/jackc/pgx/v5 v5.5.1
github.com/jmoiron/sqlx v1.3.5
Expand Down Expand Up @@ -124,7 +123,6 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
Expand Down
5 changes: 0 additions & 5 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8k
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
Expand Down
19 changes: 9 additions & 10 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -50,9 +49,9 @@ type CDCFlowWorkflowState struct {
// SnapshotComplete indicates whether the initial snapshot workflow has completed.
SnapshotComplete bool
// Errors encountered during child sync flow executions.
SyncFlowErrors error
SyncFlowErrors []string
// Errors encountered during child sync flow executions.
NormalizeFlowErrors error
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping *model.RelationMessageMapping
Expand All @@ -79,7 +78,7 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
}

// truncate the progress and other arrays to a max of 10 elements
func (s *CDCFlowWorkflowState) TruncateProgress() {
func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
if len(s.Progress) > 10 {
s.Progress = s.Progress[len(s.Progress)-10:]
}
Expand All @@ -91,12 +90,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress() {
}

if s.SyncFlowErrors != nil {
fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors)
logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors)
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
s.NormalizeFlowErrors = nil
}
}
Expand Down Expand Up @@ -373,7 +372,7 @@ func CDCFlowWorkflowWithConfig(
var childSyncFlowRes *model.SyncResponse
if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil {
w.logger.Error("failed to execute sync flow: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
if childSyncFlowRes != nil {
Expand Down Expand Up @@ -426,7 +425,7 @@ func CDCFlowWorkflowWithConfig(
var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
w.logger.Error("failed to execute schema update at source: ", err)
state.SyncFlowErrors = multierror.Append(state.SyncFlowErrors, err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
Expand All @@ -446,7 +445,7 @@ func CDCFlowWorkflowWithConfig(
var childNormalizeFlowRes *model.NormalizeResponse
if err := f.Get(ctx, &childNormalizeFlowRes); err != nil {
w.logger.Error("failed to execute normalize flow: ", err)
state.NormalizeFlowErrors = multierror.Append(state.NormalizeFlowErrors, err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
Expand All @@ -455,6 +454,6 @@ func CDCFlowWorkflowWithConfig(
batchSizeSelector.Select(ctx)
}

state.TruncateProgress()
state.TruncateProgress(w.logger)
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}

0 comments on commit c196e6c

Please sign in to comment.