diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 7b03c9de67..311913f3e5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( } limits := &peerflow.CDCFlowLimits{ - TotalSyncFlows: 0, - ExitAfterRecords: -1, - TotalNormalizeFlows: 0, - MaxBatchSize: maxBatchSize, + TotalSyncFlows: 0, + ExitAfterRecords: -1, + MaxBatchSize: maxBatchSize, } if req.ConnectionConfigs.SoftDeleteColName == "" { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 0a220ef424..725a44f65b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,29 +382,32 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { } } -func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", +func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { + query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, err + return -1, -1, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found for job") - return 0, nil + return 0, 0, nil } - if row[0] == nil { - c.logger.Info("no normalize_batch_id found returning 0") - return 0, nil - } else { - return row[0].(int64), nil + syncBatchID := int64(0) + normBatchID := int64(0) + if row[0] != nil { + syncBatchID = row[0].(int64) } + if row[1] != nil { + normBatchID = row[1].(int64) + } + return syncBatchID, normBatchID, nil } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -746,13 +749,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) - } - - // get last batchid that has been normalize - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } @@ -763,7 +760,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // if job is not yet found in the peerdb_mirror_jobs_table // OR sync is lagging end normalize - if !hasJob || normalizeBatchID == syncBatchID { + if !hasJob || normalizeBatchID >= syncBatchID { c.logger.Info("waiting for sync to catch up, so finishing") return &model.NormalizeResponse{ Done: false, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1152493b01..59a1b835bd 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -423,7 +423,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, err } // normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded. - if syncBatchID == normalizeBatchID || !jobMetadataExists { + if normalizeBatchID >= syncBatchID || !jobMetadataExists { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", syncBatchID, normalizeBatchID)) return &model.NormalizeResponse{ diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index db13e188b8..bbed7d2a55 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -71,14 +71,14 @@ const ( checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" - setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" - getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" - deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" - checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" + checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" + getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" + deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type tableNameComponents struct { @@ -345,23 +345,23 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { return result.Int64, nil } -func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, +func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) + return 0, 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) } - var result pgtype.Int8 + var syncResult, normResult pgtype.Int8 if !rows.Next() { c.logger.Warn("No row found, returning 0") - return 0, nil + return 0, 0, nil } - err = rows.Scan(&result) + err = rows.Scan(&syncResult, &normResult) if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) + return 0, 0, fmt.Errorf("error while reading result row: %w", err) } - return result.Int64, nil + return syncResult.Int64, normResult.Int64, nil } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -590,16 +590,12 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, err - } - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err } // normalize has caught up with sync, chill until more records are loaded. - if syncBatchID == normalizeBatchID { + if normalizeBatchID >= syncBatchID { return &model.NormalizeResponse{ Done: false, StartBatchID: normalizeBatchID, diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e9091a6c3f..d81a418320 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -25,10 +25,6 @@ type CDCFlowLimits struct { // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. // This is typically non-zero for testing purposes. TotalSyncFlows int - // Number of normalize flows to execute in total. - // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. - // This is typically non-zero for testing purposes. - TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int // Rows synced after which we can say a test is done. @@ -443,17 +439,13 @@ func CDCFlowWorkflowWithConfig( cfg, ) - selector := workflow.NewSelector(ctx) - selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { - var childNormalizeFlowRes *model.NormalizeResponse - if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } - }) - selector.Select(ctx) + var childNormalizeFlowRes *model.NormalizeResponse + if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) + } batchSizeSelector.Select(ctx) }