diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 7a44352fc0..e0c4da5556 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -328,7 +328,7 @@ func (s *QRepAvroSyncMethod) writeToStage( var avroFilePath string numRecords, err := func() (int, error) { ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, - avro.CompressSnappy, qvalue.QDWHTypeBigQuery) + avro.CompressDeflate, qvalue.QDWHTypeBigQuery) if s.gcsBucket != "" { bucket := s.connector.storageClient.Bucket(s.gcsBucket) avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID) @@ -370,6 +370,7 @@ func (s *QRepAvroSyncMethod) writeToStage( if s.gcsBucket != "" { gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath)) gcsRef.SourceFormat = bigquery.Avro + gcsRef.Compression = bigquery.Deflate avroRef = gcsRef } else { fh, err := os.Open(avroFilePath) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 9183762cde..e110b451da 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -68,7 +68,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { s.bqHelper.Peer, "peerdb_staging") s.NoError(err) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 52386711cf..df1653b992 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -171,7 +171,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { ) s.NoError(err) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index c3845f5f16..2fca18a700 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -164,7 +164,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { qrepConfig.InitialCopyOnly = true qrepConfig.WatermarkColumn = "ctid" - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index c1516ca8df..82901beac2 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -81,7 +81,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { ) s.NoError(err) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -126,7 +126,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } s.NoError(err) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.NoError(err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -212,7 +212,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { qrepConfig.WatermarkColumn = "xmin" s.NoError(err) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -256,7 +256,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( s.NoError(err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 6a70377ac4..9c2b27bcb0 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append WaitBetweenBatchesSeconds: 5, } - e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig) + e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index d307039cdd..26db73f765 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -297,18 +297,10 @@ func CreateQRepWorkflowConfig( return qrepConfig, nil } -func RunQrepFlowWorkflow(suite testsuite.WorkflowTestSuite, config *protos.QRepConfig) bool { - env := suite.NewTestWorkflowEnvironment() - RegisterWorkflowsAndActivities(env) +func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { state := peerflow.NewQRepFlowStateForTesting() + time.Sleep(5 * time.Second) env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) - if !env.IsWorkflowCompleted() { - return false - } - env = suite.NewTestWorkflowEnvironment() - RegisterWorkflowsAndActivities(env) - env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) - return env.IsWorkflowCompleted() } func GetOwnersSchema() *model.QRecordSchema {