From c9a88302a2cdc479d92d2617f24c1c6a19960479 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 21 Sep 2023 12:45:40 -0700 Subject: [PATCH] send heartbeats for cdc (#408) --- flow/connectors/postgres/cdc.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 73e4d22393..b32efbbc9b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -16,6 +16,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" log "github.com/sirupsen/logrus" + "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -154,7 +155,10 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - log.Infof("Sent Standby status message") + + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) + activity.RecordHeartbeat(p.ctx, numRowsProcessedMessage) + log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) }