From 017f8635259090f8cd0bcc730baa9d702a226b17 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Dec 2023 14:56:10 -0500 Subject: [PATCH] add wait for table --- flow/connectors/bigquery/bigquery.go | 23 ++++++++++++++++++++++ flow/connectors/bigquery/qrep_avro_sync.go | 10 ++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index b40c1ed765..eb6ac8d198 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -233,6 +233,29 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } +func (c *BigQueryConnector) WaitForTableReady(tblName string) error { + table := c.client.Dataset(c.datasetID).Table(tblName) + maxDuration := 5 * time.Minute + deadline := time.Now().Add(maxDuration) + sleepInterval := 15 * time.Second + attempt := 0 + + for { + if time.Now().After(deadline) { + return fmt.Errorf("timeout reached while waiting for table %s to be ready", tblName) + } + + _, err := table.Metadata(c.ctx) + if err == nil { + return nil + } + + slog.Info("waiting for table to be ready", slog.String("table", tblName), slog.Int("attempt", attempt)) + attempt++ + time.Sleep(sleepInterval) + } +} + // ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 25ed99a8bf..d60577ec74 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -394,13 +394,15 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err) } - if len(status.Errors) > 0 { - return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %v", status.Errors) - } - if err := status.Err(); err != nil { return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID)) + + err = s.connector.WaitForTableReady(stagingTable) + if err != nil { + return 0, fmt.Errorf("failed to wait for table to be ready: %w", err) + } + return avroFile.NumRecords, nil }