diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b90e9d779b..a99142c42a 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -156,6 +156,16 @@ func (p *PostgresCDCSource) consumeStream( var standByLastLogged time.Time + shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { + jobName := req.FlowJobName + currRecords := len(records.Records) + return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) + }) + + defer func() { + shutdown <- true + }() + for { if time.Now().After(nextStandbyMessageDeadline) || (len(records.Records) >= int(req.MaxBatchSize)) { diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 9809a41b6b..b16735b6bb 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -19,7 +19,7 @@ func HeartbeatRoutine( go func() { for { msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) - activity.RecordHeartbeat(ctx, msg) + RecordHeartbeatWithRecover(ctx, msg) counter += 1 to := time.After(interval) select { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 3969067664..d59de7cf69 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -83,9 +83,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, - // TODO: activity needs to call heartbeat. - // see https://github.com/PeerDB-io/nexus/issues/216 - HeartbeatTimeout: 24 * time.Hour, + HeartbeatTimeout: 5 * time.Minute, }) // execute StartFlow on the peers to start the flow