Skip to content

Commit

Permalink
trying DEFLATE and delaying QRep
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 16, 2023
1 parent 58fcb87 commit 97ef4d8
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 21 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
var avroFilePath string
numRecords, err := func() (int, error) {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressSnappy, qvalue.QDWHTypeBigQuery)
avro.CompressDeflate, qvalue.QDWHTypeBigQuery)
if s.gcsBucket != "" {
bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath = fmt.Sprintf("%s/%s.avro.snappy", objectFolder, syncID)
Expand Down Expand Up @@ -370,6 +370,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if s.gcsBucket != "" {
gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", s.gcsBucket, avroFilePath))
gcsRef.SourceFormat = bigquery.Avro
gcsRef.Compression = bigquery.Deflate
avroRef = gcsRef
} else {
fh, err := os.Open(avroFilePath)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
s.bqHelper.Peer,
"peerdb_staging")
s.NoError(err)
e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
)
s.NoError(err)

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() {
s.NoError(err)
qrepConfig.StagingPath = s.s3Helper.s3Config.Url

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() {
qrepConfig.InitialCopyOnly = true
qrepConfig.WatermarkColumn = "ctid"

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {
)
s.NoError(err)

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()
}
s.NoError(err)

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
s.NoError(err)
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
qrepConfig.WatermarkColumn = "xmin"
s.NoError(err)

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(
s.NoError(err)
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append
WaitBetweenBatchesSeconds: 5,
}

e2e.RunQrepFlowWorkflow(s.WorkflowTestSuite, qrepConfig)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
12 changes: 2 additions & 10 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,10 @@ func CreateQRepWorkflowConfig(
return qrepConfig, nil
}

func RunQrepFlowWorkflow(suite testsuite.WorkflowTestSuite, config *protos.QRepConfig) bool {
env := suite.NewTestWorkflowEnvironment()
RegisterWorkflowsAndActivities(env)
func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) {
state := peerflow.NewQRepFlowStateForTesting()
time.Sleep(5 * time.Second)
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state)
if !env.IsWorkflowCompleted() {
return false
}
env = suite.NewTestWorkflowEnvironment()
RegisterWorkflowsAndActivities(env)
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state)
return env.IsWorkflowCompleted()
}

func GetOwnersSchema() *model.QRecordSchema {
Expand Down

0 comments on commit 97ef4d8

Please sign in to comment.