Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: sequentially merge batches and update normalise batch id one at a time #1454

Merged
merged 4 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 58 additions & 43 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,14 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(ctx context.Context, jobName
func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
query := fmt.Sprintf(`SELECT DISTINCT _peerdb_destination_table_name FROM %s
WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d`,
rawTableName, normalizeBatchID, syncBatchID)
WHERE _peerdb_batch_id = %d`,
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
Expand Down Expand Up @@ -379,8 +378,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
func (c *BigQueryConnector) getTableNametoUnchangedCols(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand All @@ -390,9 +388,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(
// we don't want these particular DeleteRecords to be used in the update statement
query := fmt.Sprintf(`SELECT _peerdb_destination_table_name,
array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s
WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2
WHERE _peerdb_batch_id = %d AND _peerdb_record_type != 2
GROUP BY _peerdb_destination_table_name`,
rawTableName, normalizeBatchID, syncBatchID)
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultDatasetID = c.datasetID
Expand Down Expand Up @@ -468,7 +466,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return res, nil
}

// NormalizeRecords normalizes raw table to destination table.
// NormalizeRecords normalizes raw table to destination table,
// one batch at a time from the previous normalized batch to the currently synced batch.
func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

Expand All @@ -486,31 +485,58 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
}, nil
}

distinctTableNames, err := c.getDistinctTableNamesInBatch(
for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ {
mergeErr := c.MergeTablesInThisBatch(ctx, batchId,
req.FlowJobName, rawTableName, req.TableNameSchemaMapping,
&protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
})
if mergeErr != nil {
return nil, err
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId)
if err != nil {
return nil, err
}
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *BigQueryConnector) MergeTablesInThisBatch(
ctx context.Context,
batchId int64,
flowName string,
rawTableName string,
tableToSchema map[string]*protos.TableSchema,
peerdbColumns *protos.PeerDBColumns,
) error {
tableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
flowName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
flowName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
return fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
for _, tableName := range tableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
Expand All @@ -521,15 +547,10 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
normalizedTableSchema: tableToSchema[tableName],
batchIdForThisMerge: batchId,
peerdbCols: peerdbColumns,
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
Expand All @@ -553,20 +574,14 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
return nil
})
if err != nil {
return nil, err
return err
}
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merged raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, tableNames))
return nil
}

// CreateRawTable creates a raw table, implementing the Connector interface.
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ type mergeStmtGenerator struct {
// dataset + destination table
dstDatasetTable datasetTable
// last synced batchID.
syncBatchID int64
// last normalized batchID.
normalizeBatchID int64
batchIdForThisMerge int64
// the schema of the table to merge into
normalizedTableSchema *protos.TableSchema
// _PEERDB_IS_DELETED and _SYNCED_AT columns
Expand Down Expand Up @@ -94,10 +92,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

// normalize anything between last normalized batch id to last sync batchid
return fmt.Sprintf("WITH _f AS "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id=%d AND "+
"_peerdb_destination_table_name='%s')",
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID,
m.syncBatchID, m.dstTableName)
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.batchIdForThisMerge, m.dstTableName)
}

// This function is to support datatypes like JSON which cannot be partitioned by or compared by BigQuery
Expand Down
Loading