From fead7b267808734b5da7dec696112380e0595be4 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Sun, 10 Mar 2024 05:34:35 +0530 Subject: [PATCH] BigQuery: sequentially merge batches and update normalise batch id one at a time (#1454) --- flow/connectors/bigquery/bigquery.go | 101 ++++++++++-------- .../bigquery/merge_stmt_generator.go | 11 +- 2 files changed, 62 insertions(+), 50 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5963cedc10..9c117824ad 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -337,15 +337,14 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(ctx context.Context, jobName func (c *BigQueryConnector) getDistinctTableNamesInBatch( ctx context.Context, flowJobName string, - syncBatchID int64, - normalizeBatchID int64, + batchId int64, ) ([]string, error) { rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch query := fmt.Sprintf(`SELECT DISTINCT _peerdb_destination_table_name FROM %s - WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d`, - rawTableName, normalizeBatchID, syncBatchID) + WHERE _peerdb_batch_id = %d`, + rawTableName, batchId) // Run the query q := c.client.Query(query) q.DefaultProjectID = c.projectID @@ -379,8 +378,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( func (c *BigQueryConnector) getTableNametoUnchangedCols( ctx context.Context, flowJobName string, - syncBatchID int64, - normalizeBatchID int64, + batchId int64, ) (map[string][]string, error) { rawTableName := c.getRawTableName(flowJobName) @@ -390,9 +388,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols( // we don't want these particular DeleteRecords to be used in the update statement query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s - WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2 + WHERE _peerdb_batch_id = %d AND _peerdb_record_type != 2 GROUP BY _peerdb_destination_table_name`, - rawTableName, normalizeBatchID, syncBatchID) + rawTableName, batchId) // Run the query q := c.client.Query(query) q.DefaultDatasetID = c.datasetID @@ -468,7 +466,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro( return res, nil } -// NormalizeRecords normalizes raw table to destination table. +// NormalizeRecords normalizes raw table to destination table, +// one batch at a time from the previous normalized batch to the currently synced batch. func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) @@ -486,31 +485,58 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor }, nil } - distinctTableNames, err := c.getDistinctTableNamesInBatch( + for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ { + mergeErr := c.mergeTablesInThisBatch(ctx, batchId, + req.FlowJobName, rawTableName, req.TableNameSchemaMapping, + &protos.PeerDBColumns{ + SoftDeleteColName: req.SoftDeleteColName, + SyncedAtColName: req.SyncedAtColName, + SoftDelete: req.SoftDelete, + }) + if mergeErr != nil { + return nil, err + } + + err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId) + if err != nil { + return nil, err + } + } + + return &model.NormalizeResponse{ + Done: true, + StartBatchID: normBatchID + 1, + EndBatchID: req.SyncBatchID, + }, nil +} + +func (c *BigQueryConnector) mergeTablesInThisBatch( + ctx context.Context, + batchId int64, + flowName string, + rawTableName string, + tableToSchema map[string]*protos.TableSchema, + peerdbColumns *protos.PeerDBColumns, +) error { + tableNames, err := c.getDistinctTableNamesInBatch( ctx, - req.FlowJobName, - req.SyncBatchID, - normBatchID, + flowName, + batchId, ) if err != nil { - return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err) + return fmt.Errorf("couldn't get distinct table names to normalize: %w", err) } tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols( ctx, - req.FlowJobName, - req.SyncBatchID, - normBatchID, + flowName, + batchId, ) if err != nil { - return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) + return fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // append all the statements to one list - c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", - c.datasetID, rawTableName, distinctTableNames)) - - for _, tableName := range distinctTableNames { + for _, tableName := range tableNames { unchangedToastColumns := tableNametoUnchangedToastCols[tableName] dstDatasetTable, _ := c.convertToDatasetTable(tableName) mergeGen := &mergeStmtGenerator{ @@ -521,15 +547,10 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor }, dstTableName: tableName, dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], - syncBatchID: req.SyncBatchID, - normalizeBatchID: normBatchID, - peerdbCols: &protos.PeerDBColumns{ - SoftDeleteColName: req.SoftDeleteColName, - SyncedAtColName: req.SyncedAtColName, - SoftDelete: req.SoftDelete, - }, - shortColumn: map[string]string{}, + normalizedTableSchema: tableToSchema[tableName], + mergeBatchId: batchId, + peerdbCols: peerdbColumns, + shortColumn: map[string]string{}, } // normalize anything between last normalized batch id to last sync batchid @@ -553,20 +574,14 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor return nil }) if err != nil { - return nil, err + return err } } - err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) - if err != nil { - return nil, err - } - - return &model.NormalizeResponse{ - Done: true, - StartBatchID: normBatchID + 1, - EndBatchID: req.SyncBatchID, - }, nil + // append all the statements to one list + c.logger.Info(fmt.Sprintf("merged raw records to corresponding tables: %s %s %v", + c.datasetID, rawTableName, tableNames)) + return nil } // CreateRawTable creates a raw table, implementing the Connector interface. diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index e7810f6a8f..57fd00417e 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -18,10 +18,8 @@ type mergeStmtGenerator struct { dstTableName string // dataset + destination table dstDatasetTable datasetTable - // last synced batchID. - syncBatchID int64 - // last normalized batchID. - normalizeBatchID int64 + // batch id currently to be merged + mergeBatchId int64 // the schema of the table to merge into normalizedTableSchema *protos.TableSchema // _PEERDB_IS_DELETED and _SYNCED_AT columns @@ -94,10 +92,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // normalize anything between last normalized batch id to last sync batchid return fmt.Sprintf("WITH _f AS "+ - "(SELECT %s FROM `%s` WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND "+ + "(SELECT %s FROM `%s` WHERE _peerdb_batch_id=%d AND "+ "_peerdb_destination_table_name='%s')", - strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID, - m.syncBatchID, m.dstTableName) + strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.mergeBatchId, m.dstTableName) } // This function is to support datatypes like JSON which cannot be partitioned by or compared by BigQuery