From 8ec9475e62ecfd6e2b9308036f456d8d8f3771ba Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Dec 2023 14:01:55 -0500 Subject: [PATCH] avro loader improvements bq --- flow/connectors/bigquery/qrep_avro_sync.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index c31442ed2f..129becc01c 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -391,6 +391,7 @@ func (s *QRepAvroSyncMethod) writeToStage( loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef) loader.UseAvroLogicalTypes = true + loader.WriteDisposition = bigquery.WriteTruncate job, err := loader.Run(s.connector.ctx) if err != nil { return 0, fmt.Errorf("failed to run BigQuery load job: %w", err) @@ -401,10 +402,14 @@ func (s *QRepAvroSyncMethod) writeToStage( return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err) } + if len(status.Errors) > 0 { + return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %v", status.Errors) + } + if err := status.Err(); err != nil { return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } - log.Printf("Pushed into %s/%s", - avroFile.FilePath, syncID) + + log.Infof("Pushed into %s/%s", avroFile.FilePath, syncID) return avroFile.NumRecords, nil }