Skip to content

Commit

Permalink
avro loader improvements bq
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 12, 2023
1 parent f29deb4 commit 8ec9475
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

loader := bqClient.Dataset(datasetID).Table(stagingTable).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(s.connector.ctx)
if err != nil {
return 0, fmt.Errorf("failed to run BigQuery load job: %w", err)
Expand All @@ -401,10 +402,14 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to wait for BigQuery load job: %w", err)
}

if len(status.Errors) > 0 {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %v", status.Errors)
}

if err := status.Err(); err != nil {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
log.Printf("Pushed into %s/%s",
avroFile.FilePath, syncID)

log.Infof("Pushed into %s/%s", avroFile.FilePath, syncID)
return avroFile.NumRecords, nil
}

0 comments on commit 8ec9475

Please sign in to comment.