diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index aa2e7750b5..587ec456db 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -227,6 +227,29 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } +func (c *BigQueryConnector) WaitForTableReady(tblName string) error { + table := c.client.Dataset(c.datasetID).Table(tblName) + maxDuration := 5 * time.Minute + deadline := time.Now().Add(maxDuration) + sleepInterval := 15 * time.Second + attempt := 0 + + for { + if time.Now().After(deadline) { + return fmt.Errorf("timeout reached while waiting for table %s to be ready", tblName) + } + + _, err := table.Metadata(c.ctx) + if err == nil { + return nil + } + + log.Infof("waiting for table %s to be ready, attempt %d", tblName, attempt) + attempt++ + time.Sleep(sleepInterval) + } +} + // ReplayTableSchemaDeltas changes a destination table to match the schema at source // This could involve adding or dropping multiple columns. func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 129becc01c..bbe14e396c 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -402,14 +402,15 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err) } - if len(status.Errors) > 0 { - return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %v", status.Errors) - } - if err := status.Err(); err != nil { return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } + err = s.connector.WaitForTableReady(stagingTable) + if err != nil { + return 0, fmt.Errorf("failed to wait for table to be ready: %w", err) + } + log.Infof("Pushed into %s/%s", avroFile.FilePath, syncID) return avroFile.NumRecords, nil }