Skip to content

Commit

Permalink
increase reading time and giving Snappy another chance
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 17, 2023
1 parent ecbfdf0 commit e681cef
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
5 changes: 4 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressDeflate, qvalue.QDWHTypeBigQuery)
avro.CompressSnappy, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID)
Expand Down Expand Up @@ -360,6 +360,9 @@ func (s *QRepAvroSyncMethod) writeToStage(
if err != nil {
return 0, err
}
if numRecords == 0 {
return 0, nil
}
log.WithFields(log.Fields{
"batchOrPartitionID": syncID,
}).Infof("wrote %d records to file %s", numRecords, avroFilePath)
Expand Down
4 changes: 4 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) {

t.Run(tt.name, tt.test)
}

t.Cleanup(func() {
s.TearDownSuite()
})
}

func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string {
Expand Down

0 comments on commit e681cef

Please sign in to comment.