From e681cef7744d24b60ae1c8895da8b9a9156dbacc Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 17 Nov 2023 18:50:24 +0530 Subject: [PATCH] increase reading time and giving Snappy another chance --- .github/workflows/flow.yml | 2 +- flow/connectors/bigquery/qrep_avro_sync.go | 5 ++++- flow/e2e/snowflake/peer_flow_sf_test.go | 4 ++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index a0a1889838..5c39d9a166 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -120,4 +120,4 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres - PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 + PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10 diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index e0c4da5556..c7ce8ce198 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -328,7 +328,7 @@ func (s *QRepAvroSyncMethod) writeToStage( var avroFilePath string numRecords, err := func() (int, error) { ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, - avro.CompressDeflate, qvalue.QDWHTypeBigQuery) + avro.CompressSnappy, qvalue.QDWHTypeBigQuery) if s.gcsBucket != "" { bucket := s.connector.storageClient.Bucket(s.gcsBucket) avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID) @@ -360,6 +360,9 @@ func (s *QRepAvroSyncMethod) writeToStage( if err != nil { return 0, err } + if numRecords == 0 { + return 0, nil + } log.WithFields(log.Fields{ "batchOrPartitionID": syncID, }).Infof("wrote %d records to file %s", numRecords, avroFilePath) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 219f37c493..230f9d1b38 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -65,6 +65,10 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { t.Run(tt.name, tt.test) } + + t.Cleanup(func() { + s.TearDownSuite() + }) } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string {