diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index dd922ba1f5..04a9ccd708 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( } limits := &peerflow.CDCFlowLimits{ - TotalSyncFlows: 0, - ExitAfterRecords: -1, - TotalNormalizeFlows: 0, - MaxBatchSize: maxBatchSize, + TotalSyncFlows: 0, + ExitAfterRecords: -1, + MaxBatchSize: maxBatchSize, } if req.ConnectionConfigs.SoftDeleteColName == "" { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3da34f99d7..11a88e8382 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,29 +382,35 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { } } -func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", +func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) { + query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, err + return model.SyncAndNormalizeBatchID{}, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found for job") - return 0, nil + return model.SyncAndNormalizeBatchID{}, nil } - if row[0] == nil { - c.logger.Info("no normalize_batch_id found returning 0") - return 0, nil - } else { - return row[0].(int64), nil + syncBatchID := int64(0) + normBatchID := int64(0) + if row[0] != nil { + syncBatchID = row[0].(int64) } + if row[1] != nil { + normBatchID = row[1].(int64) + } + return model.SyncAndNormalizeBatchID{ + SyncBatchID: syncBatchID, + NormalizeBatchID: normBatchID, + }, nil } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -736,13 +742,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) - } - - // get last batchid that has been normalize - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } @@ -753,20 +753,28 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // if job is not yet found in the peerdb_mirror_jobs_table // OR sync is lagging end normalize - if !hasJob || normalizeBatchID == syncBatchID { + if !hasJob || batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID { c.logger.Info("waiting for sync to catch up, so finishing") return &model.NormalizeResponse{ Done: false, - StartBatchID: normalizeBatchID, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID, + EndBatchID: batchIDs.SyncBatchID, }, nil } - distinctTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID) + distinctTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err) } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID) + tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } @@ -786,8 +794,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) dstTableName: tableName, dstDatasetTable: dstDatasetTable, normalizedTableSchema: c.tableNameSchemaMapping[tableName], - syncBatchID: syncBatchID, - normalizeBatchID: normalizeBatchID, + syncBatchID: batchIDs.SyncBatchID, + normalizeBatchID: batchIDs.NormalizeBatchID, unchangedToastColumns: tableNametoUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDeleteColName: req.SoftDeleteColName, @@ -802,7 +810,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", - c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) + c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) query := strings.Join(stmts, "\n") @@ -813,8 +821,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return &model.NormalizeResponse{ Done: true, - StartBatchID: normalizeBatchID + 1, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID + 1, + EndBatchID: batchIDs.SyncBatchID, }, nil } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1152493b01..59a1b835bd 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -423,7 +423,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, err } // normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded. - if syncBatchID == normalizeBatchID || !jobMetadataExists { + if normalizeBatchID >= syncBatchID || !jobMetadataExists { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", syncBatchID, normalizeBatchID)) return &model.NormalizeResponse{ diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index db13e188b8..5d2c7e03b1 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -71,14 +71,14 @@ const ( checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" - setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" - getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" - deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" - checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" + checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" + getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" + deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type tableNameComponents struct { @@ -345,23 +345,27 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { return result.Int64, nil } -func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, +func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) { + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) + return model.SyncAndNormalizeBatchID{}, + fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) } - var result pgtype.Int8 + var syncResult, normResult pgtype.Int8 if !rows.Next() { c.logger.Warn("No row found, returning 0") - return 0, nil + return model.SyncAndNormalizeBatchID{}, nil } - err = rows.Scan(&result) + err = rows.Scan(&syncResult, &normResult) if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) + return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error while reading result row: %w", err) } - return result.Int64, nil + return model.SyncAndNormalizeBatchID{ + SyncBatchID: syncResult.Int64, + NormalizeBatchID: normResult.Int64, + }, nil } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -590,20 +594,16 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, err - } - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err } // normalize has caught up with sync, chill until more records are loaded. - if syncBatchID == normalizeBatchID { + if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID { return &model.NormalizeResponse{ Done: false, - StartBatchID: normalizeBatchID, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID, + EndBatchID: batchIDs.SyncBatchID, }, nil } @@ -617,12 +617,16 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest Done: false, }, nil } - destinationTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID) + destinationTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, err } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID) + tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID) if err != nil { return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err) } @@ -640,7 +644,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest tableName, tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), - syncBatchID, normalizeBatchID, + batchIDs.SyncBatchID, batchIDs.NormalizeBatchID, req) if err != nil { c.logger.Error("[merge] error while normalizing records", slog.Any("error", err)) @@ -657,15 +661,15 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest } // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID) + err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID) if err != nil { return nil, err } return &model.NormalizeResponse{ Done: true, - StartBatchID: normalizeBatchID + 1, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID + 1, + EndBatchID: batchIDs.SyncBatchID, }, nil } diff --git a/flow/model/model.go b/flow/model/model.go index fc2c12d849..b579ccfb56 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -416,6 +416,11 @@ func (r *CDCRecordStream) GetRecords() chan Record { return r.records } +type SyncAndNormalizeBatchID struct { + SyncBatchID int64 + NormalizeBatchID int64 +} + type SyncRecordsRequest struct { Records *CDCRecordStream // FlowJobName is the name of the flow job. diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e9091a6c3f..55751bd096 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -25,10 +25,6 @@ type CDCFlowLimits struct { // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. // This is typically non-zero for testing purposes. TotalSyncFlows int - // Number of normalize flows to execute in total. - // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. - // This is typically non-zero for testing purposes. - TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int // Rows synced after which we can say a test is done. @@ -160,6 +156,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } + ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) w := NewCDCFlowWorkflowExecution(ctx) if limits.TotalSyncFlows == 0 { @@ -174,6 +171,10 @@ func CDCFlowWorkflowWithConfig( return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) } + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead @@ -189,10 +190,6 @@ func CDCFlowWorkflowWithConfig( } } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) @@ -208,7 +205,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowCtx = workflow.WithValue(setupFlowCtx, "flowName", cfg.FlowJobName) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -236,7 +232,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowCtx = workflow.WithValue(snapshotFlowCtx, "flowName", cfg.FlowJobName) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -270,7 +265,6 @@ func CDCFlowWorkflowWithConfig( StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: 1 * time.Hour, }) - renameTablesCtx = workflow.WithValue(renameTablesCtx, "flowName", cfg.FlowJobName) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { return state, fmt.Errorf("failed to execute rename tables activity: %w", err) @@ -351,9 +345,6 @@ func CDCFlowWorkflowWithConfig( return state, err } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -363,18 +354,17 @@ func CDCFlowWorkflowWithConfig( }, SearchAttributes: mirrorNameSearch, } - ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + syncCtx, SyncFlowWorkflow, cfg, syncFlowOptions, ) var childSyncFlowRes *model.SyncResponse - if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil { + if err := childSyncFlowFuture.Get(syncCtx, &childSyncFlowRes); err != nil { w.logger.Error("failed to execute sync flow: ", err) state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { @@ -387,20 +377,6 @@ func CDCFlowWorkflowWithConfig( w.logger.Info("Total records synced: ", totalRecordsSynced) - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - - childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - SearchAttributes: mirrorNameSearch, - } - ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas @@ -419,7 +395,6 @@ func CDCFlowWorkflowWithConfig( getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, @@ -436,24 +411,34 @@ func CDCFlowWorkflowWithConfig( } } } - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) + if err != nil { + return state, err + } + + childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: normalizeFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + } + normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + normCtx, NormalizeFlowWorkflow, cfg, ) - selector := workflow.NewSelector(ctx) - selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { - var childNormalizeFlowRes *model.NormalizeResponse - if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } - }) - selector.Select(ctx) + var childNormalizeFlowRes *model.NormalizeResponse + if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) + } batchSizeSelector.Select(ctx) } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index af14e11b8f..39256eac1a 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -51,7 +51,6 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( HeartbeatTimeout: 5 * time.Minute, }) - // execute StartFlow on the peers to start the flow startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, }