From a989caba33304bb75e49571fa2f7bf9c702ca9c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sun, 25 Feb 2024 16:31:59 +0000 Subject: [PATCH] Use NewNonRetryableApplicationError when replState mismatch occurs (#1374) Otherwise activity retries & continually hits same error --- flow/connectors/postgres/postgres.go | 3 ++- flow/workflows/cdc_flow.go | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index aeb44f36c6..00b4e7b825 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -17,6 +17,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" @@ -149,7 +150,7 @@ func (c *PostgresConnector) MaybeStartReplication( c.replState.Slot, slotName, c.replState.Publication, publicationName, c.replState.Offset, req.LastOffset, ) c.logger.Info(msg) - return errors.New(msg) + return temporal.NewNonRetryableApplicationError(msg, "desync", nil) } if c.replState == nil { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c93e1415e5..ec89504fd9 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -539,9 +539,7 @@ func CDCFlowWorkflowWithConfig( totalRecordsSynced += childSyncFlowRes.NumRecordsSynced w.logger.Info("Total records synced: ", slog.Int64("totalRecordsSynced", totalRecordsSynced)) - } - if childSyncFlowRes != nil { tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.