From e340e460f97f0000268c771442a807162f186321 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 22 Oct 2023 17:43:07 -0400 Subject: [PATCH] sundry cdc fixes (#557) --- flow/connectors/postgres/cdc.go | 9 ++++++++- flow/workflows/sync_flow.go | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9bfff285ae..142f353146 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -154,6 +154,8 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos = clientXLogPos - 1 } + var standByLastLogged time.Time + for { if time.Now().After(nextStandbyMessageDeadline) || (len(records.Records) >= int(req.MaxBatchSize)) { @@ -167,7 +169,12 @@ func (p *PostgresCDCSource) consumeStream( numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) - log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) + + if time.Since(standByLastLogged) > 10*time.Second { + log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) + standByLastLogged = time.Now() + } + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index d63c57d1ed..3969067664 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -82,10 +82,10 @@ func (s *SyncFlowExecution) executeSyncFlow( } startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 24 * time.Hour, + StartToCloseTimeout: 72 * time.Hour, // TODO: activity needs to call heartbeat. // see https://github.com/PeerDB-io/nexus/issues/216 - HeartbeatTimeout: 30 * time.Second, + HeartbeatTimeout: 24 * time.Hour, }) // execute StartFlow on the peers to start the flow