From 458abc12c228a748dfbcceeceb17bff63fc06e01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sun, 24 Dec 2023 00:59:08 +0000 Subject: [PATCH 1/3] Clean up code, fetch syncbatchid/normalizebatchid together --- flow/cmd/handler.go | 7 ++--- flow/connectors/bigquery/bigquery.go | 31 +++++++++----------- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/snowflake/snowflake.go | 40 ++++++++++++-------------- flow/workflows/cdc_flow.go | 22 +++++--------- 5 files changed, 43 insertions(+), 59 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 7b03c9de67..311913f3e5 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -138,10 +138,9 @@ func (h *FlowRequestHandler) CreateCDCFlow( } limits := &peerflow.CDCFlowLimits{ - TotalSyncFlows: 0, - ExitAfterRecords: -1, - TotalNormalizeFlows: 0, - MaxBatchSize: maxBatchSize, + TotalSyncFlows: 0, + ExitAfterRecords: -1, + MaxBatchSize: maxBatchSize, } if req.ConnectionConfigs.SoftDeleteColName == "" { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 0a220ef424..725a44f65b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,29 +382,32 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { } } -func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", +func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { + query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, err + return -1, -1, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found for job") - return 0, nil + return 0, 0, nil } - if row[0] == nil { - c.logger.Info("no normalize_batch_id found returning 0") - return 0, nil - } else { - return row[0].(int64), nil + syncBatchID := int64(0) + normBatchID := int64(0) + if row[0] != nil { + syncBatchID = row[0].(int64) } + if row[1] != nil { + normBatchID = row[1].(int64) + } + return syncBatchID, normBatchID, nil } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -746,13 +749,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) - } - - // get last batchid that has been normalize - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } @@ -763,7 +760,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // if job is not yet found in the peerdb_mirror_jobs_table // OR sync is lagging end normalize - if !hasJob || normalizeBatchID == syncBatchID { + if !hasJob || normalizeBatchID >= syncBatchID { c.logger.Info("waiting for sync to catch up, so finishing") return &model.NormalizeResponse{ Done: false, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1152493b01..59a1b835bd 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -423,7 +423,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, err } // normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded. - if syncBatchID == normalizeBatchID || !jobMetadataExists { + if normalizeBatchID >= syncBatchID || !jobMetadataExists { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", syncBatchID, normalizeBatchID)) return &model.NormalizeResponse{ diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index db13e188b8..bbed7d2a55 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -71,14 +71,14 @@ const ( checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" - setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" - getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" - dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" - deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" - checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" + checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" + getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" + dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" + deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type tableNameComponents struct { @@ -345,23 +345,23 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { return result.Int64, nil } -func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, +func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { + rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) + return 0, 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) } - var result pgtype.Int8 + var syncResult, normResult pgtype.Int8 if !rows.Next() { c.logger.Warn("No row found, returning 0") - return 0, nil + return 0, 0, nil } - err = rows.Scan(&result) + err = rows.Scan(&syncResult, &normResult) if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) + return 0, 0, fmt.Errorf("error while reading result row: %w", err) } - return result.Int64, nil + return syncResult.Int64, normResult.Int64, nil } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -590,16 +590,12 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) - if err != nil { - return nil, err - } - normalizeBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err } // normalize has caught up with sync, chill until more records are loaded. - if syncBatchID == normalizeBatchID { + if normalizeBatchID >= syncBatchID { return &model.NormalizeResponse{ Done: false, StartBatchID: normalizeBatchID, diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e9091a6c3f..d81a418320 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -25,10 +25,6 @@ type CDCFlowLimits struct { // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. // This is typically non-zero for testing purposes. TotalSyncFlows int - // Number of normalize flows to execute in total. - // If 0, the number of sync flows will be continuously executed until the peer flow is cancelled. - // This is typically non-zero for testing purposes. - TotalNormalizeFlows int // Maximum number of rows in a sync flow batch. MaxBatchSize int // Rows synced after which we can say a test is done. @@ -443,17 +439,13 @@ func CDCFlowWorkflowWithConfig( cfg, ) - selector := workflow.NewSelector(ctx) - selector.AddFuture(childNormalizeFlowFuture, func(f workflow.Future) { - var childNormalizeFlowRes *model.NormalizeResponse - if err := f.Get(ctx, &childNormalizeFlowRes); err != nil { - w.logger.Error("failed to execute normalize flow: ", err) - state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) - } else { - state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) - } - }) - selector.Select(ctx) + var childNormalizeFlowRes *model.NormalizeResponse + if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + w.logger.Error("failed to execute normalize flow: ", err) + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) + } else { + state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes) + } batchSizeSelector.Select(ctx) } From a50a2dcb3817e199de789b068ccf0399180e98d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sun, 24 Dec 2023 01:54:09 +0000 Subject: [PATCH 2/3] cleanup context management --- flow/workflows/cdc_flow.go | 57 ++++++++++++++------------------ flow/workflows/normalize_flow.go | 1 - 2 files changed, 25 insertions(+), 33 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d81a418320..55751bd096 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -156,6 +156,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } + ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) w := NewCDCFlowWorkflowExecution(ctx) if limits.TotalSyncFlows == 0 { @@ -170,6 +171,10 @@ func CDCFlowWorkflowWithConfig( return state, fmt.Errorf("failed to set `%s` query handler: %w", CDCFlowStatusQuery, err) } + mirrorNameSearch := map[string]interface{}{ + shared.MirrorNameSearchAttribute: cfg.FlowJobName, + } + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead @@ -185,10 +190,6 @@ func CDCFlowWorkflowWithConfig( } } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) @@ -204,7 +205,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowCtx = workflow.WithValue(setupFlowCtx, "flowName", cfg.FlowJobName) setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -232,7 +232,6 @@ func CDCFlowWorkflowWithConfig( SearchAttributes: mirrorNameSearch, } snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) - snapshotFlowCtx = workflow.WithValue(snapshotFlowCtx, "flowName", cfg.FlowJobName) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { return state, fmt.Errorf("failed to execute child workflow: %w", err) @@ -266,7 +265,6 @@ func CDCFlowWorkflowWithConfig( StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: 1 * time.Hour, }) - renameTablesCtx = workflow.WithValue(renameTablesCtx, "flowName", cfg.FlowJobName) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { return state, fmt.Errorf("failed to execute rename tables activity: %w", err) @@ -347,9 +345,6 @@ func CDCFlowWorkflowWithConfig( return state, err } - mirrorNameSearch := map[string]interface{}{ - shared.MirrorNameSearchAttribute: cfg.FlowJobName, - } // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, @@ -359,18 +354,17 @@ func CDCFlowWorkflowWithConfig( }, SearchAttributes: mirrorNameSearch, } - ctx = workflow.WithChildOptions(ctx, childSyncFlowOpts) - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) syncFlowOptions.RelationMessageMapping = *state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + syncCtx, SyncFlowWorkflow, cfg, syncFlowOptions, ) var childSyncFlowRes *model.SyncResponse - if err := childSyncFlowFuture.Get(ctx, &childSyncFlowRes); err != nil { + if err := childSyncFlowFuture.Get(syncCtx, &childSyncFlowRes); err != nil { w.logger.Error("failed to execute sync flow: ", err) state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) } else { @@ -383,20 +377,6 @@ func CDCFlowWorkflowWithConfig( w.logger.Info("Total records synced: ", totalRecordsSynced) - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - - childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ - WorkflowID: normalizeFlowID, - ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, - RetryPolicy: &temporal.RetryPolicy{ - MaximumAttempts: 20, - }, - SearchAttributes: mirrorNameSearch, - } - ctx = workflow.WithChildOptions(ctx, childNormalizeFlowOpts) var tableSchemaDeltas []*protos.TableSchemaDelta = nil if childSyncFlowRes != nil { tableSchemaDeltas = childSyncFlowRes.TableSchemaDeltas @@ -415,7 +395,6 @@ func CDCFlowWorkflowWithConfig( getModifiedSchemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - getModifiedSchemaCtx = workflow.WithValue(getModifiedSchemaCtx, "flowName", cfg.FlowJobName) getModifiedSchemaFuture := workflow.ExecuteActivity(getModifiedSchemaCtx, flowable.GetTableSchema, &protos.GetTableSchemaBatchInput{ PeerConnectionConfig: cfg.Source, @@ -432,15 +411,29 @@ func CDCFlowWorkflowWithConfig( } } } - ctx = workflow.WithValue(ctx, "flowName", cfg.FlowJobName) + + normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) + if err != nil { + return state, err + } + + childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ + WorkflowID: normalizeFlowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 20, + }, + SearchAttributes: mirrorNameSearch, + } + normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( - ctx, + normCtx, NormalizeFlowWorkflow, cfg, ) var childNormalizeFlowRes *model.NormalizeResponse - if err := childNormalizeFlowFuture.Get(ctx, &childNormalizeFlowRes); err != nil { + if err := childNormalizeFlowFuture.Get(normCtx, &childNormalizeFlowRes); err != nil { w.logger.Error("failed to execute normalize flow: ", err) state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error()) } else { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index af14e11b8f..39256eac1a 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -51,7 +51,6 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( HeartbeatTimeout: 5 * time.Minute, }) - // execute StartFlow on the peers to start the flow startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, } From 78ef62acc1d5cce33f9f59c85e8bba97c444046e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 15:50:37 +0000 Subject: [PATCH 3/3] GetLastSyncAndNormalizeBatchID: return struct instead of two int64s --- flow/connectors/bigquery/bigquery.go | 41 ++++++++++++++++---------- flow/connectors/postgres/cdc.go | 4 +-- flow/connectors/snowflake/snowflake.go | 38 ++++++++++++++---------- flow/model/model.go | 5 ++++ 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 725a44f65b..2b927741dd 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,21 +382,21 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { } } -func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { +func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) { query := fmt.Sprintf("SELECT sync_batch_id, normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return -1, -1, err + return model.SyncAndNormalizeBatchID{}, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found for job") - return 0, 0, nil + return model.SyncAndNormalizeBatchID{}, nil } syncBatchID := int64(0) @@ -407,7 +407,10 @@ func (c *BigQueryConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int6 if row[1] != nil { normBatchID = row[1].(int64) } - return syncBatchID, normBatchID, nil + return model.SyncAndNormalizeBatchID{ + SyncBatchID: syncBatchID, + NormalizeBatchID: normBatchID, + }, nil } func (c *BigQueryConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -749,7 +752,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) - syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) + batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } @@ -760,20 +763,28 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } // if job is not yet found in the peerdb_mirror_jobs_table // OR sync is lagging end normalize - if !hasJob || normalizeBatchID >= syncBatchID { + if !hasJob || batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID { c.logger.Info("waiting for sync to catch up, so finishing") return &model.NormalizeResponse{ Done: false, - StartBatchID: normalizeBatchID, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID, + EndBatchID: batchIDs.SyncBatchID, }, nil } - distinctTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID) + distinctTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err) } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID) + tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } @@ -789,8 +800,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) NormalizedTable: tableName, RawTable: rawTableName, NormalizedTableSchema: c.tableNameSchemaMapping[tableName], - SyncBatchID: syncBatchID, - NormalizeBatchID: normalizeBatchID, + SyncBatchID: batchIDs.SyncBatchID, + NormalizeBatchID: batchIDs.NormalizeBatchID, UnchangedToastColumns: tableNametoUnchangedToastCols[tableName], peerdbCols: &protos.PeerDBColumns{ SoftDeleteColName: req.SoftDeleteColName, @@ -805,7 +816,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", - c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) + c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) query := strings.Join(stmts, "\n") @@ -816,8 +827,8 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return &model.NormalizeResponse{ Done: true, - StartBatchID: normalizeBatchID + 1, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID + 1, + EndBatchID: batchIDs.SyncBatchID, }, nil } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b3686f4d09..2be3fcb2a5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -360,7 +360,6 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) rec, err := p.processMessage(records, xld, clientXLogPos) - if err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -470,7 +469,8 @@ func (p *PostgresCDCSource) consumeStream( } func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, - currentClientXlogPos pglogrepl.LSN) (model.Record, error) { + currentClientXlogPos pglogrepl.LSN, +) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index bbed7d2a55..5d2c7e03b1 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -345,23 +345,27 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { return result.Int64, nil } -func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (int64, int64, error) { +func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (model.SyncAndNormalizeBatchID, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncNormalizeBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return 0, 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) + return model.SyncAndNormalizeBatchID{}, + fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) } var syncResult, normResult pgtype.Int8 if !rows.Next() { c.logger.Warn("No row found, returning 0") - return 0, 0, nil + return model.SyncAndNormalizeBatchID{}, nil } err = rows.Scan(&syncResult, &normResult) if err != nil { - return 0, 0, fmt.Errorf("error while reading result row: %w", err) + return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error while reading result row: %w", err) } - return syncResult.Int64, normResult.Int64, nil + return model.SyncAndNormalizeBatchID{ + SyncBatchID: syncResult.Int64, + NormalizeBatchID: normResult.Int64, + }, nil } func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, @@ -590,16 +594,16 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( // NormalizeRecords normalizes raw table to destination table. func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { - syncBatchID, normalizeBatchID, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) + batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { return nil, err } // normalize has caught up with sync, chill until more records are loaded. - if normalizeBatchID >= syncBatchID { + if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID { return &model.NormalizeResponse{ Done: false, - StartBatchID: normalizeBatchID, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID, + EndBatchID: batchIDs.SyncBatchID, }, nil } @@ -613,12 +617,16 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest Done: false, }, nil } - destinationTableNames, err := c.getDistinctTableNamesInBatch(req.FlowJobName, syncBatchID, normalizeBatchID) + destinationTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, + batchIDs.SyncBatchID, + batchIDs.NormalizeBatchID, + ) if err != nil { return nil, err } - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID) + tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID) if err != nil { return nil, fmt.Errorf("couldn't tablename to unchanged cols mapping: %w", err) } @@ -636,7 +644,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest tableName, tableNametoUnchangedToastCols[tableName], getRawTableIdentifier(req.FlowJobName), - syncBatchID, normalizeBatchID, + batchIDs.SyncBatchID, batchIDs.NormalizeBatchID, req) if err != nil { c.logger.Error("[merge] error while normalizing records", slog.Any("error", err)) @@ -653,15 +661,15 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest } // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID) + err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID) if err != nil { return nil, err } return &model.NormalizeResponse{ Done: true, - StartBatchID: normalizeBatchID + 1, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID + 1, + EndBatchID: batchIDs.SyncBatchID, }, nil } diff --git a/flow/model/model.go b/flow/model/model.go index 581b57178b..9b8213c8cd 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -416,6 +416,11 @@ func (r *CDCRecordStream) GetRecords() chan Record { return r.records } +type SyncAndNormalizeBatchID struct { + SyncBatchID int64 + NormalizeBatchID int64 +} + type SyncRecordsRequest struct { Records *CDCRecordStream // FlowJobName is the name of the flow job.