diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b89249031f..79b08c4312 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -198,7 +198,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) - shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("transferring records for job - %s", jobName) }) @@ -383,7 +383,7 @@ func (a *FlowableActivity) StartNormalize( } defer connectors.CloseConnector(dstConn) - shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) defer shutdown() @@ -452,7 +452,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } defer connectors.CloseConnector(srcConn) - shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) }) defer shutdown() @@ -590,7 +590,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } } - shutdown := utils.HeartbeatRoutine(ctx, 1*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) defer shutdown() @@ -633,7 +633,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config return err } - shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) }) defer shutdown() @@ -942,7 +942,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return nil }) - shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(ctx, func() string { return "syncing xmin." }) defer shutdown() diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index bf784dd494..31717d706c 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -380,7 +380,7 @@ func (s *QRepAvroSyncMethod) writeToStage( stream *model.QRecordStream, flowName string, ) (int, error) { - shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute, + shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string { return fmt.Sprintf("writing to avro stage for objectFolder %s and staging table %s", objectFolder, stagingTable) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7a7ce63ba6..7f06e1b875 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error { allErrors = errors.Join(allErrors, err) } - err = c.hubManager.Close(context.Background()) + err = c.hubManager.Close(c.ctx) if err != nil { c.logger.Error("failed to close event hub manager", slog.Any("error", err)) allErrors = errors.Join(allErrors, err) @@ -132,7 +132,7 @@ func (c *EventHubConnector) processBatch( lastUpdatedOffset := int64(0) numRecords := atomic.Uint32{} - shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(c.ctx, func() string { return fmt.Sprintf( "processed %d records for flow %s", numRecords.Load(), flowJobName, diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 48a518b3d3..310044d659 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,6 +7,7 @@ import ( "log/slog" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -127,7 +128,11 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu func (m *EventHubManager) Close(ctx context.Context) error { var allErrors error - + numHubsClosed := atomic.Uint32{} + shutdown := utils.HeartbeatRoutine(ctx, func() string { + return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) + }) + defer shutdown() m.hubs.Range(func(key any, value any) bool { name := key.(ScopedEventhub) hub := value.(*azeventhubs.ProducerClient) @@ -136,6 +141,8 @@ func (m *EventHubManager) Close(ctx context.Context) error { slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } + + numHubsClosed.Add(1) return true }) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 193208581a..7da6e1ea40 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream( } }() - shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(p.ctx, func() string { jobName := p.flowJobName currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index fd2aac3735..aa1b2b838c 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -578,7 +578,7 @@ func (c *PostgresConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName)) } @@ -695,7 +695,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab tableExistsMapping[tableIdentifier] = false c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) - utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) + utils.RecordHeartbeat(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) } err = createNormalizedTablesTx.Commit(c.ctx) @@ -800,7 +800,7 @@ func (c *PostgresConnector) EnsurePullability( }, }, } - utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) + utils.RecordHeartbeat(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName)) } return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 627c2e2fcf..4d4950895e 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/geo" @@ -84,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName) if !qe.testEnv { - shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(qe.ctx, func() string { qe.logger.Info(fmt.Sprintf("still running '%s'...", q)) return fmt.Sprintf("running '%s'", q) }) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 47484a06f8..12cd9af3ab 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -279,7 +279,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage activity.RecordHeartbeat(s.connector.ctx, "putting file to stage") putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) - shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string { return fmt.Sprintf("putting file to stage %s", stage) }) defer shutdown() diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 929b840c29..0509852d78 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -228,7 +228,7 @@ func (c *SnowflakeConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName)) } return &protos.GetTableSchemaBatchOutput{ diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 743dcb6419..c2f3af94dd 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -8,7 +8,6 @@ import ( "log/slog" "os" "sync/atomic" - "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model" @@ -131,7 +130,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( numRows := atomic.Uint32{} if p.ctx != nil { - shutdown := utils.HeartbeatRoutine(p.ctx, 30*time.Second, func() string { + shutdown := utils.HeartbeatRoutine(p.ctx, func() string { written := numRows.Load() return fmt.Sprintf("[avro] written %d rows to OCF", written) }) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index fda4ea06d2..201814c2c0 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -3,7 +3,6 @@ package utils import ( "context" "fmt" - "log/slog" "time" "go.temporal.io/sdk/activity" @@ -11,22 +10,24 @@ import ( func HeartbeatRoutine( ctx context.Context, - interval time.Duration, message func() string, ) func() { shutdown := make(chan struct{}) go func() { counter := 0 + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { counter += 1 msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) - RecordHeartbeatWithRecover(ctx, msg) + RecordHeartbeat(ctx, msg) select { case <-shutdown: return case <-ctx.Done(): return - case <-time.After(interval): + case <-ticker.C: } } }() @@ -35,12 +36,8 @@ func HeartbeatRoutine( // if the functions are being called outside the context of a Temporal workflow, // activity.RecordHeartbeat panics, this is a bandaid for that. -func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) { - defer func() { - if r := recover(); r != nil { - slog.Warn("ignoring panic from activity.RecordHeartbeat") - slog.Warn("this can happen when function is invoked outside of a Temporal workflow") - } - }() - activity.RecordHeartbeat(ctx, details...) +func RecordHeartbeat(ctx context.Context, details ...interface{}) { + if activity.IsActivity(ctx) { + activity.RecordHeartbeat(ctx, details...) + } }