diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 65c6ce588..dc16f5fbe 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -120,4 +120,4 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres - PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3 + PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10 diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index e0c4da555..c7ce8ce19 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -328,7 +328,7 @@ func (s *QRepAvroSyncMethod) writeToStage( var avroFilePath string numRecords, err := func() (int, error) { ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, - avro.CompressDeflate, qvalue.QDWHTypeBigQuery) + avro.CompressSnappy, qvalue.QDWHTypeBigQuery) if s.gcsBucket != "" { bucket := s.connector.storageClient.Bucket(s.gcsBucket) avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID) @@ -360,6 +360,9 @@ func (s *QRepAvroSyncMethod) writeToStage( if err != nil { return 0, err } + if numRecords == 0 { + return 0, nil + } log.WithFields(log.Fields{ "batchOrPartitionID": syncID, }).Infof("wrote %d records to file %s", numRecords, avroFilePath) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 907da8ed8..49aee77b2 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -66,6 +66,10 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { t.Run(tt.name, tt.test) } + + t.Cleanup(func() { + s.TearDownSuite() + }) } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string {