From 44148afa8f71a8b5a9b12ef8734176086e768dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 15 Feb 2024 23:10:36 +0000 Subject: [PATCH] more logging, counting --- flow/connectors/postgres/cdc.go | 4 +++- flow/connectors/postgres/postgres.go | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 12d65d8a30..1ff14b3e27 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -170,10 +170,12 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco pkmRequiresResponse := false waitingForCommit := false + iii := 0 for { + iii++ _, err := p.getCurrentLSN(ctx) if err != nil { - return fmt.Errorf("!!! current LSN: %w", err) + return fmt.Errorf("!!! current LSN %d: %w", iii, err) } if pkmRequiresResponse { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 9c0e9ee854..206ff45b3a 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -348,6 +348,11 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo return err } + _, err = c.getCurrentLSN(ctx) + if err != nil { + return fmt.Errorf("=== current LSN: %w", err) + } + cdc, err := c.NewPostgresCDCSource(&PostgresCDCConfig{ SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName,