Skip to content

Commit

Permalink
Fix normalize heartbeats (#1223)
Browse files Browse the repository at this point in the history
Was seeing activity heartbeat timeouts during destination table setup
for a large number of tables. This PR attempts to fix that

---------

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and iskakaushik authored Feb 7, 2024
1 parent 5f8e7d1 commit 14dc0be
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"regexp"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -630,6 +631,15 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
func (c *BigQueryConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(req.TableNameSchemaMapping))

shutdown := utils.HeartbeatRoutine(c.ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

tableExistsMapping := make(map[string]bool)
datasetTablesSet := make(map[datasetTable]struct{})
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
Expand Down Expand Up @@ -666,6 +676,9 @@ func (c *BigQueryConnector) SetupNormalizedTables(
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
datasetTablesSet[*datasetTable] = struct{}{}

c.logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
numTablesSetup.Add(1)
continue
}

Expand Down Expand Up @@ -738,7 +751,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
numTablesSetup.Add(1)
}

return &protos.SetupNormalizedTableBatchOutput{
Expand Down

0 comments on commit 14dc0be

Please sign in to comment.