Skip to content

Commit

Permalink
more frequent heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 7, 2024
1 parent 4d9f740 commit 3e5c4b9
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"regexp"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -815,6 +816,15 @@ func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
func (c *BigQueryConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(req.TableNameSchemaMapping))

shutdown := utils.HeartbeatRoutine(c.ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

tableExistsMapping := make(map[string]bool)
datasetTablesSet := make(map[datasetTable]struct{})
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
Expand Down Expand Up @@ -908,7 +918,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
numTablesSetup.Add(1)
}

return &protos.SetupNormalizedTableBatchOutput{
Expand Down

0 comments on commit 3e5c4b9

Please sign in to comment.