From 3e5c4b96d54b2a74cd7cdf6ed98eb9f9adad1e1e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 7 Feb 2024 11:58:15 -0800 Subject: [PATCH] more frequent heartbeats --- flow/connectors/bigquery/bigquery.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index efbe8d0d4f..01e769b7b8 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -9,6 +9,7 @@ import ( "reflect" "regexp" "strings" + "sync/atomic" "time" "cloud.google.com/go/bigquery" @@ -815,6 +816,15 @@ func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) { func (c *BigQueryConnector) SetupNormalizedTables( req *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error) { + numTablesSetup := atomic.Uint32{} + totalTables := uint32(len(req.TableNameSchemaMapping)) + + shutdown := utils.HeartbeatRoutine(c.ctx, func() string { + return fmt.Sprintf("setting up normalized tables - %d of %d done", + numTablesSetup.Load(), totalTables) + }) + defer shutdown() + tableExistsMapping := make(map[string]bool) datasetTablesSet := make(map[datasetTable]struct{}) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { @@ -908,7 +918,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( datasetTablesSet[*datasetTable] = struct{}{} // log that table was created c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) - utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) + numTablesSetup.Add(1) } return &protos.SetupNormalizedTableBatchOutput{