Skip to content

Commit

Permalink
also lower wait time for table ready
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 23, 2023
1 parent 2cc80cc commit 152fa9a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
func (c *BigQueryConnector) waitForTableReady(tblName string) error {
table := c.client.Dataset(c.datasetID).Table(tblName)
maxDuration := 5 * time.Minute
deadline := time.Now().Add(maxDuration)
sleepInterval := 15 * time.Second
sleepInterval := 5 * time.Second
attempt := 0

for {
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
slog.String("batchOrPartitionID", syncID),
)
if s.gcsBucket != "" {

bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
Expand Down Expand Up @@ -415,7 +414,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
}
slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID))

err = s.connector.WaitForTableReady(stagingTable)
err = s.connector.waitForTableReady(stagingTable)
if err != nil {
return 0, fmt.Errorf("failed to wait for table to be ready: %w", err)
}
Expand Down

0 comments on commit 152fa9a

Please sign in to comment.