diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5963cedc10..27144af763 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -337,15 +337,14 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(ctx context.Context, jobName func (c *BigQueryConnector) getDistinctTableNamesInBatch( ctx context.Context, flowJobName string, - syncBatchID int64, - normalizeBatchID int64, + batchId int64, ) ([]string, error) { rawTableName := c.getRawTableName(flowJobName) // Prepare the query to retrieve distinct tables in that batch query := fmt.Sprintf(`SELECT DISTINCT _peerdb_destination_table_name FROM %s - WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d`, - rawTableName, normalizeBatchID, syncBatchID) + WHERE _peerdb_batch_id = %d`, + rawTableName, batchId) // Run the query q := c.client.Query(query) q.DefaultProjectID = c.projectID @@ -379,8 +378,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( func (c *BigQueryConnector) getTableNametoUnchangedCols( ctx context.Context, flowJobName string, - syncBatchID int64, - normalizeBatchID int64, + batchId int64, ) (map[string][]string, error) { rawTableName := c.getRawTableName(flowJobName) @@ -390,9 +388,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols( // we don't want these particular DeleteRecords to be used in the update statement query := fmt.Sprintf(`SELECT _peerdb_destination_table_name, array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s - WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2 + WHERE _peerdb_batch_id = %d AND _peerdb_record_type != 2 GROUP BY _peerdb_destination_table_name`, - rawTableName, normalizeBatchID, syncBatchID) + rawTableName, batchId) // Run the query q := c.client.Query(query) q.DefaultDatasetID = c.datasetID @@ -486,82 +484,80 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor }, nil } - distinctTableNames, err := c.getDistinctTableNamesInBatch( - ctx, - req.FlowJobName, - req.SyncBatchID, - normBatchID, - ) - if err != nil { - return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err) - } - - tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols( - ctx, - req.FlowJobName, - req.SyncBatchID, - normBatchID, - ) - if err != nil { - return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) - } - - // append all the statements to one list - c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", - c.datasetID, rawTableName, distinctTableNames)) + for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ { + distinctTableNames, err := c.getDistinctTableNamesInBatch( + ctx, + req.FlowJobName, + batchId, + ) + if err != nil { + return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err) + } - for _, tableName := range distinctTableNames { - unchangedToastColumns := tableNametoUnchangedToastCols[tableName] - dstDatasetTable, _ := c.convertToDatasetTable(tableName) - mergeGen := &mergeStmtGenerator{ - rawDatasetTable: datasetTable{ - project: c.projectID, - dataset: c.datasetID, - table: rawTableName, - }, - dstTableName: tableName, - dstDatasetTable: dstDatasetTable, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], - syncBatchID: req.SyncBatchID, - normalizeBatchID: normBatchID, - peerdbCols: &protos.PeerDBColumns{ - SoftDeleteColName: req.SoftDeleteColName, - SyncedAtColName: req.SyncedAtColName, - SoftDelete: req.SoftDelete, - }, - shortColumn: map[string]string{}, + tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols( + ctx, + req.FlowJobName, + batchId, + ) + if err != nil { + return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - // normalize anything between last normalized batch id to last sync batchid - // TODO (kaushik): This is so that the statement size for individual merge statements - // doesn't exceed the limit. We should make this configurable. - const batchSize = 8 - stmtNum := 0 - err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { - stmtNum += 1 - mergeStmt := mergeGen.generateMergeStmt(chunk) - c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..", - stmtNum, tableName)) - - q := c.client.Query(mergeStmt) - q.DefaultProjectID = c.projectID - q.DefaultDatasetID = dstDatasetTable.dataset - _, err := q.Read(ctx) + // append all the statements to one list + c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", + c.datasetID, rawTableName, distinctTableNames)) + + for _, tableName := range distinctTableNames { + unchangedToastColumns := tableNametoUnchangedToastCols[tableName] + dstDatasetTable, _ := c.convertToDatasetTable(tableName) + mergeGen := &mergeStmtGenerator{ + rawDatasetTable: datasetTable{ + project: c.projectID, + dataset: c.datasetID, + table: rawTableName, + }, + dstTableName: tableName, + dstDatasetTable: dstDatasetTable, + normalizedTableSchema: req.TableNameSchemaMapping[tableName], + batchIdForThisMerge: batchId, + peerdbCols: &protos.PeerDBColumns{ + SoftDeleteColName: req.SoftDeleteColName, + SyncedAtColName: req.SyncedAtColName, + SoftDelete: req.SoftDelete, + }, + shortColumn: map[string]string{}, + } + + // normalize anything between last normalized batch id to last sync batchid + // TODO (kaushik): This is so that the statement size for individual merge statements + // doesn't exceed the limit. We should make this configurable. + const batchSize = 8 + stmtNum := 0 + err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { + stmtNum += 1 + mergeStmt := mergeGen.generateMergeStmt(chunk) + c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..", + stmtNum, tableName)) + + q := c.client.Query(mergeStmt) + q.DefaultProjectID = c.projectID + q.DefaultDatasetID = dstDatasetTable.dataset + _, err := q.Read(ctx) + if err != nil { + return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + } + return nil + }) if err != nil { - return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + return nil, err } - return nil - }) + } + + err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) if err != nil { return nil, err } } - - err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID) - if err != nil { - return nil, err - } - return &model.NormalizeResponse{ Done: true, StartBatchID: normBatchID + 1, diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index e7810f6a8f..4d8078ba06 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -19,9 +19,7 @@ type mergeStmtGenerator struct { // dataset + destination table dstDatasetTable datasetTable // last synced batchID. - syncBatchID int64 - // last normalized batchID. - normalizeBatchID int64 + batchIdForThisMerge int64 // the schema of the table to merge into normalizedTableSchema *protos.TableSchema // _PEERDB_IS_DELETED and _SYNCED_AT columns @@ -94,10 +92,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // normalize anything between last normalized batch id to last sync batchid return fmt.Sprintf("WITH _f AS "+ - "(SELECT %s FROM `%s` WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND "+ + "(SELECT %s FROM `%s` WHERE _peerdb_batch_id=%d AND "+ "_peerdb_destination_table_name='%s')", - strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID, - m.syncBatchID, m.dstTableName) + strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.batchIdForThisMerge, m.dstTableName) } // This function is to support datatypes like JSON which cannot be partitioned by or compared by BigQuery