From b3343ef2dc82a7c115a7d9a8e58afe2f87e361be Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 29 Nov 2023 22:41:25 +0530 Subject: [PATCH] replaces totalsyncflows with exitafterrecords --- flow/cmd/handler.go | 1 + flow/e2e/bigquery/peer_flow_bq_test.go | 60 ++++++++++----------- flow/e2e/postgres/peer_flow_pg_test.go | 20 +++---- flow/e2e/s3/cdc_s3_test.go | 8 +-- flow/e2e/snowflake/peer_flow_sf_test.go | 69 ++++++++++++------------- flow/workflows/cdc_flow.go | 10 ++++ 6 files changed, 88 insertions(+), 80 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 13ce8115b1..2e2f1838f4 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -137,6 +137,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( limits := &peerflow.CDCFlowLimits{ TotalSyncFlows: 0, + ExitAfterRecords: -1, TotalNormalizeFlows: 0, MaxBatchSize: maxBatchSize, } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 90fbb552dc..239de7f3ba 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -112,8 +112,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) @@ -158,8 +158,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -204,8 +204,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -253,8 +253,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -323,8 +323,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -393,8 +393,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -456,8 +456,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -531,8 +531,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -601,8 +601,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -671,8 +671,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -748,8 +748,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -811,8 +811,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -916,8 +916,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -992,8 +992,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1071,8 +1071,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 45666f0369..3f5718bb72 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index aaca2a125a..5265316d90 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -44,8 +44,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -115,8 +115,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 3c62490271..8c212cd991 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -150,15 +150,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 15 rows into the source table + // and then insert 20 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 15 rows into the source table + // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -167,7 +167,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -227,15 +227,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table + // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) @@ -271,7 +271,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.Contains(err.Error(), "continue as new") // We inserted 4 invalid shapes in each. - // They should have filtered out as null on destination + // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") s.NoError(err) s.Equal(6, lineCount) @@ -315,8 +315,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -386,12 +386,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -402,7 +400,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { END; `, srcTableName, srcTableName)) s.NoError(err) - fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -449,8 +446,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -524,8 +521,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -594,8 +591,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -664,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -740,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -800,8 +797,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -965,8 +962,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1041,8 +1038,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1119,8 +1116,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1201,8 +1198,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 753cff3804..bd786aff5a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -33,6 +33,8 @@ type CDCFlowLimits struct { TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int + // Rows synced after which we can say a test is done. + ExitAfterRecords int } type CDCFlowWorkflowState struct { @@ -277,6 +279,7 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum := 0 + totalRecordsSynced := 0 for { // check and act on signals before a fresh flow starts. @@ -312,6 +315,12 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum++ + // check if total records synced have been completed + if totalRecordsSynced == limits.ExitAfterRecords { + w.logger.Warn("All the records have been synced successfully, so ending the flow") + break + } + syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { return state, err @@ -342,6 +351,7 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += int(childSyncFlowRes.NumRecordsSynced) } }