diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index e6d80798fd..576010babb 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -9,6 +9,7 @@ import ( "reflect" "regexp" "strings" + "sync/atomic" "time" "cloud.google.com/go/bigquery" @@ -630,6 +631,15 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr func (c *BigQueryConnector) SetupNormalizedTables( req *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error) { + numTablesSetup := atomic.Uint32{} + totalTables := uint32(len(req.TableNameSchemaMapping)) + + shutdown := utils.HeartbeatRoutine(c.ctx, func() string { + return fmt.Sprintf("setting up normalized tables - %d of %d done", + numTablesSetup.Load(), totalTables) + }) + defer shutdown() + tableExistsMapping := make(map[string]bool) datasetTablesSet := make(map[datasetTable]struct{}) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { @@ -666,6 +676,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( // table exists, go to next table tableExistsMapping[tableIdentifier] = true datasetTablesSet[*datasetTable] = struct{}{} + + c.logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier)) + numTablesSetup.Add(1) continue } @@ -738,7 +751,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( datasetTablesSet[*datasetTable] = struct{}{} // log that table was created c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) - utils.RecordHeartbeat(c.ctx, fmt.Sprintf("created table %s", tableIdentifier)) + numTablesSetup.Add(1) } return &protos.SetupNormalizedTableBatchOutput{