diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 128bde255c..44fb5f01a6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -142,6 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( limits := &peerflow.CDCFlowLimits{ TotalSyncFlows: 0, + ExitAfterRecords: -1, TotalNormalizeFlows: 0, MaxBatchSize: maxBatchSize, } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 6c0f6b4ac9..73c8ed528c 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -211,15 +211,6 @@ func (c *EventHubConnector) processBatch( } func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { - return fmt.Sprintf("syncing records to eventhub with"+ - " push parallelism %d and push batch size %d", - req.PushParallelism, req.PushBatchSize) - }) - defer func() { - shutdown <- true - }() - maxParallelism := req.PushParallelism if maxParallelism <= 0 { maxParallelism = 10 @@ -229,6 +220,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S batch := req.Records var numRecords uint32 + shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string { + return fmt.Sprintf( + "processed %d records for flow %s", + numRecords, req.FlowJobName, + ) + }) + defer func() { + shutdown <- true + }() + // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true // we kick off processBatch in a goroutine and return immediately. // otherwise, we block until processBatch is done. diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 552f372a04..cfb0425515 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -164,9 +164,6 @@ func (p *PostgresCDCSource) consumeStream( clientXLogPos pglogrepl.LSN, records *model.CDCRecordStream, ) error { - standbyMessageTimeout := req.IdleTimeout - nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - defer func() { err := conn.Close(p.ctx) if err != nil { @@ -211,6 +208,8 @@ func (p *PostgresCDCSource) consumeStream( }() tablePKeyLastSeen := make(map[model.TableWithPkey]int) + standbyMessageTimeout := req.IdleTimeout + nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) addRecord := func(rec model.Record) { records.AddRecord(rec) @@ -218,12 +217,17 @@ func (p *PostgresCDCSource) consumeStream( if len(localRecords) == 1 { records.SignalAsNotEmpty() + log.Infof("pushing the standby deadline to %s", time.Now().Add(standbyMessageTimeout)) + log.Infof("num records accumulated: %d", len(localRecords)) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } } + pkmRequiresResponse := false + waitingForCommit := false + for { - if time.Now().After(nextStandbyMessageDeadline) || - (len(localRecords) >= int(req.MaxBatchSize)) { + if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, @@ -232,26 +236,64 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } - numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) - if time.Since(standByLastLogged) > 10*time.Second { + numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(localRecords)) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) standByLastLogged = time.Now() } - nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + pkmRequiresResponse = false + } - if !p.commitLock && (len(localRecords) >= int(req.MaxBatchSize)) { - return nil + if (len(localRecords) >= int(req.MaxBatchSize)) && !p.commitLock { + return nil + } + + if waitingForCommit && !p.commitLock { + log.Infof( + "[%s] commit received, returning currently accumulated records - %d", + req.FlowJobName, + len(localRecords), + ) + return nil + } + + // if we are past the next standby deadline (?) + if time.Now().After(nextStandbyMessageDeadline) { + if len(localRecords) > 0 { + log.Infof("[%s] standby deadline reached, have %d records, will return at next commit", + req.FlowJobName, + len(localRecords), + ) + + if !p.commitLock { + // immediate return if we are not waiting for a commit + return nil + } + + waitingForCommit = true + } else { + log.Infof("[%s] standby deadline reached, no records accumulated, continuing to wait", + req.FlowJobName, + ) } + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + } + + var ctx context.Context + var cancel context.CancelFunc + + if len(localRecords) == 0 { + ctx, cancel = context.WithCancel(p.ctx) + } else { + ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline) } - ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) rawMsg, err := conn.ReceiveMessage(ctx) cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords)) + log.Infof("Stand-by deadline reached, returning currently accumulated records - %d", len(localRecords)) return nil } else { return fmt.Errorf("ReceiveMessage failed: %w", err) @@ -281,9 +323,10 @@ func (p *PostgresCDCSource) consumeStream( if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd } - if pkm.ReplyRequested { - nextStandbyMessageDeadline = time.Time{} - } + + // always reply to keepalive messages + // instead of `pkm.ReplyRequested` + pkmRequiresResponse = true case pglogrepl.XLogDataByteID: xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 1618e84326..ba8c2cb54c 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -111,8 +111,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) @@ -156,8 +156,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -201,8 +201,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 1, + ExitAfterRecords: 0, + MaxBatchSize: 1, } env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -249,8 +249,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -318,8 +318,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -387,8 +387,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -449,8 +449,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 1, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -523,8 +523,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -592,8 +592,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -661,8 +661,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -737,8 +737,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -799,8 +799,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -903,8 +903,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -978,8 +978,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1056,8 +1056,8 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 45666f0369..3f5718bb72 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -45,8 +45,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -107,8 +107,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -271,8 +271,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -350,8 +350,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -431,8 +431,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index aaca2a125a..85993e2fe8 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -24,7 +24,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") - flowJobName := s.attachSuffix("test_simple_flow") + flowJobName := s.attachSuffix("test_simple_flow_s3") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, @@ -44,8 +44,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + TotalSyncFlows: 4, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -95,7 +96,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") - flowJobName := s.attachSuffix("test_simple_flow") + flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, @@ -115,8 +116,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 5, + TotalSyncFlows: 4, + ExitAfterRecords: 20, + MaxBatchSize: 5, } go func() { @@ -131,6 +133,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { `, srcTableName), testKey, testValue) s.NoError(err) } + fmt.Println("Inserted 20 rows into the source table") s.NoError(err) }() diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index e489545e3f..e5b7c588f9 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -149,15 +149,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 15 rows into the source table + // and then insert 20 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 15 rows into the source table + // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, srcTableName), testKey, testValue) s.NoError(err) } - fmt.Println("Inserted 10 rows into the source table") + fmt.Println("Inserted 20 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -225,15 +225,15 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - // insert 10 rows into the source table + // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) @@ -269,7 +269,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { s.Contains(err.Error(), "continue as new") // We inserted 4 invalid shapes in each. - // They should have filtered out as null on destination + // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") s.NoError(err) s.Equal(6, lineCount) @@ -312,8 +312,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -382,12 +382,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 0, + MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -398,7 +396,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { END; `, srcTableName, srcTableName)) s.NoError(err) - fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -444,8 +441,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 11, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -518,8 +515,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 6, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -587,8 +584,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 4, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -656,8 +653,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -731,8 +728,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 2, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -790,8 +787,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 10, - MaxBatchSize: 100, + ExitAfterRecords: 1, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -954,8 +951,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 5, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1029,8 +1026,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 20, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1106,8 +1103,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { s.NoError(err) limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup @@ -1187,8 +1184,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 2, - MaxBatchSize: 100, + ExitAfterRecords: 10, + MaxBatchSize: 100, } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2dfe99ff25..eb9b42cc03 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -33,6 +33,8 @@ type CDCFlowLimits struct { TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int + // Rows synced after which we can say a test is done. + ExitAfterRecords int } type CDCFlowWorkflowState struct { @@ -289,6 +291,7 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum := 0 + totalRecordsSynced := 0 for { // check and act on signals before a fresh flow starts. @@ -324,6 +327,12 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum++ + // check if total records synced have been completed + if totalRecordsSynced == limits.ExitAfterRecords { + w.logger.Warn("All the records have been synced successfully, so ending the flow") + break + } + syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) if err != nil { return state, err @@ -358,9 +367,12 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) if childSyncFlowRes != nil { state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping + totalRecordsSynced += int(childSyncFlowRes.NumRecordsSynced) } } + w.logger.Info("Total records synced: ", totalRecordsSynced) + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) if err != nil { return state, err diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index d59de7cf69..0b207bc65f 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -83,7 +83,7 @@ func (s *SyncFlowExecution) executeSyncFlow( startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 72 * time.Hour, - HeartbeatTimeout: 5 * time.Minute, + HeartbeatTimeout: 30 * time.Second, }) // execute StartFlow on the peers to start the flow