From f931bc02fc1a09f8389c55d920ffe11754fb146c Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Fri, 20 Dec 2024 01:48:49 +0530 Subject: [PATCH] Current LSN : Account for NULL LSN value being returned (#2372) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ```golang // ParseLSN parses the given XXX/XXX text format LSN used by PostgreSQL. func ParseLSN(s string) (LSN, error) { var upperHalf uint64 var lowerHalf uint64 var nparsed int nparsed, err := fmt.Sscanf(s, "%X/%X", &upperHalf, &lowerHalf) if err != nil { return 0, fmt.Errorf("failed to parse LSN: %w", err) } if nparsed != 2 { return 0, fmt.Errorf("failed to parsed LSN: %s", s) } return LSN((upperHalf << 32) + lowerHalf), nil } ``` It was recently seen for a read replica mirror that the above `pglogrepl` function we use to get the current LSN returned an EOF error at the first check - indicating that the LSN string we were giving it was not of the right format. Upon research it seems like the functions in the query we run: ```sql SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END ``` can return `NULL` as seen [here](https://github.com/postgres/postgres/blob/1f0de66ea2a5549a3768c67434e28a136c280571/src/backend/access/transam/xlogfuncs.c#L338) for example. It seems that this can occur when there is no more WAL activity to collect, not sure This PR simply adds more logging around this. --------- Co-authored-by: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Co-authored-by: Philip Dubé --- flow/connectors/postgres/client.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 172a45c6b..4816f1dd4 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -657,9 +657,16 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e var result pgtype.Text err := row.Scan(&result) if err != nil { - return 0, fmt.Errorf("error while running query: %w", err) + return 0, fmt.Errorf("error while running query for current LSN: %w", err) } - return pglogrepl.ParseLSN(result.String) + if !result.Valid || result.String == "" { + return 0, errors.New("error while getting current LSN: no LSN available") + } + lsn, err := pglogrepl.ParseLSN(result.String) + if err != nil { + return 0, fmt.Errorf("error while parsing LSN %s: %w", result.String, err) + } + return lsn, nil } func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {