Skip to content

Commit

Permalink
fixed issue, and BigQuery compression is dead
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 22, 2023
1 parent e93258c commit abb795e
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
avro.CompressNone, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID)
avroFilePath = fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(s.connector.ctx)

Expand All @@ -346,7 +346,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
return 0, fmt.Errorf("failed to create temp dir: %w", err)
}

avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", tmpDir, syncID)
avroFilePath = fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("writing records to local file %s", avroFilePath)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 3,
TotalSyncFlows: 2,
MaxBatchSize: 100,
}

Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt

// read rows from destination table
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName),
)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
fmt.Printf("running query on bigquery: %s\n", bqSelQuery)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
s.NoError(err)

s.True(pgRows.Equals(bqRows), "rows from source and destination tables are not equal")
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment,
log.Errorln(err)
}

if state.SetupComplete {
if state.SnapshotComplete {
break
}
} else {
Expand Down

0 comments on commit abb795e

Please sign in to comment.