Skip to content

Commit

Permalink
sequentially merge batches and update normalise batch id one at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 9, 2024
1 parent facfe8b commit f71e132
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 81 deletions.
146 changes: 71 additions & 75 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,14 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(ctx context.Context, jobName
func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
query := fmt.Sprintf(`SELECT DISTINCT _peerdb_destination_table_name FROM %s
WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d`,
rawTableName, normalizeBatchID, syncBatchID)
WHERE _peerdb_batch_id = %d`,
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
Expand Down Expand Up @@ -379,8 +378,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
func (c *BigQueryConnector) getTableNametoUnchangedCols(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand All @@ -390,9 +388,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(
// we don't want these particular DeleteRecords to be used in the update statement
query := fmt.Sprintf(`SELECT _peerdb_destination_table_name,
array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s
WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2
WHERE _peerdb_batch_id = %d AND _peerdb_record_type != 2
GROUP BY _peerdb_destination_table_name`,
rawTableName, normalizeBatchID, syncBatchID)
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultDatasetID = c.datasetID
Expand Down Expand Up @@ -486,82 +484,80 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
}, nil
}

distinctTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))
for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ {
distinctTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

for _, tableName := range distinctTableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
req.FlowJobName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// normalize anything between last normalized batch id to last sync batchid
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err := q.Read(ctx)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
batchIdForThisMerge: batchId,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
return nil, err
}
return nil
})
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, err
}
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ type mergeStmtGenerator struct {
// dataset + destination table
dstDatasetTable datasetTable
// last synced batchID.
syncBatchID int64
// last normalized batchID.
normalizeBatchID int64
batchIdForThisMerge int64
// the schema of the table to merge into
normalizedTableSchema *protos.TableSchema
// _PEERDB_IS_DELETED and _SYNCED_AT columns
Expand Down Expand Up @@ -94,10 +92,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

// normalize anything between last normalized batch id to last sync batchid
return fmt.Sprintf("WITH _f AS "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id=%d AND "+
"_peerdb_destination_table_name='%s')",
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID,
m.syncBatchID, m.dstTableName)
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.batchIdForThisMerge, m.dstTableName)
}

// This function is to support datatypes like JSON which cannot be partitioned by or compared by BigQuery
Expand Down

0 comments on commit f71e132

Please sign in to comment.