diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index aeb44f36c6..00b4e7b825 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" @@ -149,7 +150,7 @@ func (c *PostgresConnector) MaybeStartReplication( c.replState.Slot, slotName, c.replState.Publication, publicationName, c.replState.Offset, req.LastOffset, ) c.logger.Info(msg) - return errors.New(msg) + return temporal.NewNonRetryableApplicationError(msg, "desync", nil) } if c.replState == nil { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c93e1415e5..ec89504fd9 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -539,9 +539,7 @@ func CDCFlowWorkflowWithConfig( totalRecordsSynced += childSyncFlowRes.NumRecordsSynced w.logger.Info("Total records synced: ", slog.Int64("totalRecordsSynced", totalRecordsSynced)) - } - if childSyncFlowRes != nil { tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.