diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8bbee11bcf..5011845a23 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -157,6 +157,7 @@ func (a *FlowableActivity) CreateNormalizedTable( // StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { + activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics) @@ -180,6 +181,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, if err != nil { return nil, fmt.Errorf("failed to initialize table schema: %w", err) } + activity.RecordHeartbeat(ctx, "initialized table schema") log.WithFields(log.Fields{ "flowName": input.FlowConnectionConfigs.FlowJobName, @@ -202,6 +204,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to pull records: %w", err) } recordBatch := recordsWithTableSchemaDelta.RecordBatch + + pullRecordWithCount := fmt.Sprintf("pulled %d records", len(recordBatch.Records)) + activity.RecordHeartbeat(ctx, pullRecordWithCount) + if a.CatalogMirrorMonitor.IsActive() && len(recordBatch.Records) > 0 { syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { @@ -238,6 +244,15 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }, nil } + shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { + jobName := input.FlowConnectionConfigs.FlowJobName + return fmt.Sprintf("pushing records for job - %s", jobName) + }) + + defer func() { + shutdown <- true + }() + syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ Records: recordBatch, @@ -275,7 +290,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } res.TableSchemaDelta = recordsWithTableSchemaDelta.TableSchemaDelta res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping - activity.RecordHeartbeat(ctx, "pushed records") + + pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) + activity.RecordHeartbeat(ctx, pushedRecordsWithCount) return res, nil } diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 6661d6ecc4..fe7a285838 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -85,7 +85,7 @@ func (s *SyncFlowExecution) executeSyncFlow( StartToCloseTimeout: 24 * time.Hour, // TODO: activity needs to call heartbeat. // see https://github.com/PeerDB-io/nexus/issues/216 - HeartbeatTimeout: 5 * time.Minute, + HeartbeatTimeout: 30 * time.Second, }) // execute StartFlow on the peers to start the flow