From 4e51cece56958be2aeac272b4ade82ef16291fad Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 21 Sep 2023 12:29:14 -0700 Subject: [PATCH] send heartbeats for cdc --- flow/connectors/postgres/cdc.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 73e4d22393..b32efbbc9b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -16,6 +16,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" log "github.com/sirupsen/logrus" + "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -154,7 +155,10 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - log.Infof("Sent Standby status message") + + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) + activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage) + log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) }