Skip to content

Commit

Permalink
replaces totalsyncflows with exitafterrecords
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 29, 2023
1 parent e80b7c0 commit b3343ef
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 80 deletions.
1 change: 1 addition & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(

limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
ExitAfterRecords: -1,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}
Expand Down
60 changes: 30 additions & 30 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil)
Expand Down Expand Up @@ -158,8 +158,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -204,8 +204,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 1,
ExitAfterRecords: 0,
MaxBatchSize: 1,
}

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -253,8 +253,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -323,8 +323,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -393,8 +393,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 0,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -456,8 +456,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 1,
MaxBatchSize: 100,
ExitAfterRecords: 11,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -531,8 +531,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 6,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -601,8 +601,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -671,8 +671,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {

limits := peerflow.CDCFlowLimits{

TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -748,8 +748,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -811,8 +811,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 10,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -916,8 +916,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -992,8 +992,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -1071,8 +1071,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down
20 changes: 10 additions & 10 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 10,
MaxBatchSize: 100,
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
ExitAfterRecords: 20,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
ExitAfterRecords: 10,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 5,
ExitAfterRecords: 20,
MaxBatchSize: 5,
}

go func() {
Expand Down Expand Up @@ -115,8 +115,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 5,
MaxBatchSize: 5,
ExitAfterRecords: 20,
MaxBatchSize: 5,
}

go func() {
Expand Down
Loading

0 comments on commit b3343ef

Please sign in to comment.