Skip to content

Commit

Permalink
getCurrentLsn should be able to return null to caller without error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 20, 2025
1 parent d05f7c4 commit 3168c85
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
21 changes: 13 additions & 8 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ const (
AND application_name LIKE 'peerdb%' AND client_addr IS NOT NULL`
)

type ReplicaIdentityType rune
type (
ReplicaIdentityType rune
NullableLSN struct {
pglogrepl.LSN
Null bool
}
)

const (
ReplicaIdentityDefault ReplicaIdentityType = 'd'
Expand Down Expand Up @@ -651,22 +657,21 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(
return resultMap, nil
}

func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, error) {
func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (NullableLSN, error) {
row := c.conn.QueryRow(ctx,
"SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END")
var result pgtype.Text
err := row.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error while running query for current LSN: %w", err)
if err := row.Scan(&result); err != nil {
return NullableLSN{}, fmt.Errorf("error while running query for current LSN: %w", err)
}
if !result.Valid || result.String == "" {
return 0, errors.New("error while getting current LSN: no LSN available")
return NullableLSN{Null: true}, nil
}
lsn, err := pglogrepl.ParseLSN(result.String)
if err != nil {
return 0, fmt.Errorf("error while parsing LSN %s: %w", result.String, err)
return NullableLSN{}, fmt.Errorf("error while parsing LSN %s: %w", result.String, err)
}
return lsn, nil
return NullableLSN{LSN: lsn}, nil
}

func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,12 @@ func pullCore[Items model.Items](
return err
}

latestLSN, err := c.getCurrentLSN(ctx)
if err != nil {
// There are cases where the pg_is_in_recovery() returns null - in read replicas for instance
// Since this is just a monitoring metric, we can ignore the error
// Since this is just a monitoring metric, we can ignore errors about LSN
if latestLSN, err := c.getCurrentLSN(ctx); err != nil {
c.logger.Error("error getting current LSN", slog.Any("error", err))
} else if err := monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)); err != nil {
} else if latestLSN.Null {
c.logger.Info("Current LSN null, probably read replica starting up")
} else if err := monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN.LSN)); err != nil {
c.logger.Error("error updating latest LSN at source for CDC flow", slog.Any("error", err))
}

Expand Down

0 comments on commit 3168c85

Please sign in to comment.