Skip to content

Commit

Permalink
replaces totalsyncflows with exitafterrecords
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 29, 2023
1 parent 2aaf40f commit 4f11f98
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 80 deletions.
1 change: 1 addition & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}
Expand Down
60 changes: 30 additions & 30 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil)
Expand Down Expand Up @@ -156,8 +156,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -201,8 +201,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -249,8 +249,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -318,8 +318,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -387,8 +387,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 0,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -449,8 +449,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 100,
ExitAfterRecords: 11,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -523,8 +523,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 6,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -592,8 +592,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -661,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {

limits := peerflow.CDCFlowLimits{

TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -737,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -799,8 +799,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 10,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -903,8 +903,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -978,8 +978,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -1056,8 +1056,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down
20 changes: 10 additions & 10 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 10,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 5,
ExitAfterRecords: 20,
MaxBatchSize: 5,
}

go func() {
Expand Down Expand Up @@ -115,8 +115,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 5,
ExitAfterRecords: 20,
MaxBatchSize: 5,
}

go func() {
Expand Down
Loading

0 comments on commit 4f11f98

Please sign in to comment.