diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 128bde255c..44fb5f01a6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -142,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( limits := &peerflow.CDCFlowLimits{ TotalSyncFlows: 0, + ExitAfterRecords: -1, TotalNormalizeFlows: 0, MaxBatchSize: maxBatchSize, } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 1618e84326..ba8c2cb54c 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -111,8 +111,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) @@ -156,8 +156,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -201,8 +201,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -249,8 +249,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -318,8 +318,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -387,8 +387,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -449,8 +449,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -523,8 +523,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -592,8 +592,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -661,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -737,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -799,8 +799,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -903,8 +903,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -978,8 +978,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1056,8 +1056,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 45666f0369..3f5718bb72 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index aaca2a125a..5265316d90 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -44,8 +44,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -115,8 +115,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index e489545e3f..e5b7c588f9 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -149,15 +149,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 15 rows into the source table + // and then insert 20 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 15 rows into the source table + // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -225,15 +225,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table + // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) @@ -269,7 +269,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.Contains(err.Error(), "continue as new") // We inserted 4 invalid shapes in each. - // They should have filtered out as null on destination + // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") s.NoError(err) s.Equal(6, lineCount) @@ -312,8 +312,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -382,12 +382,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -398,7 +396,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { END; `, srcTableName, srcTableName)) s.NoError(err) - fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -444,8 +441,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -518,8 +515,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -587,8 +584,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -656,8 +653,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -731,8 +728,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -790,8 +787,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -954,8 +951,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1029,8 +1026,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1106,8 +1103,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1187,8 +1184,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2dfe99ff25..531a3868ab 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -33,6 +33,8 @@ type CDCFlowLimits struct { TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int + // Rows synced after which we can say a test is done. + ExitAfterRecords int } type CDCFlowWorkflowState struct { @@ -289,6 +291,7 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum := 0 + totalRecordsSynced := 0 for { // check and act on signals before a fresh flow starts. @@ -324,6 +327,12 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum++ + // check if total records synced have been completed + if totalRecordsSynced == limits.ExitAfterRecords { + w.logger.Warn("All the records have been synced successfully, so ending the flow") + break + } + syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { return state, err @@ -358,6 +367,7 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += int(childSyncFlowRes.NumRecordsSynced) } }