diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 73e4d22393..b32efbbc9b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -16,6 +16,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" log "github.com/sirupsen/logrus" + "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -154,7 +155,10 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - log.Infof("Sent Standby status message") + + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) + activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage) + log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) }