Skip to content

Commit

Permalink
send heartbeats for cdc (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 21, 2023
1 parent 77f03aa commit c9a8830
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)

type PostgresCDCSource struct {
Expand Down Expand Up @@ -154,7 +155,10 @@ func (p *PostgresCDCSource) consumeStream(
if err != nil {
return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
log.Infof("Sent Standby status message")

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records))
activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}

Expand Down

0 comments on commit c9a8830

Please sign in to comment.