Skip to content

Commit

Permalink
BigQuery: sequentially merge batches and update normalise batch id on…
Browse files Browse the repository at this point in the history
…e at a time (#1454)
  • Loading branch information
Amogh-Bharadwaj authored Mar 10, 2024
1 parent facfe8b commit fead7b2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 50 deletions.
101 changes: 58 additions & 43 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,14 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(ctx context.Context, jobName
func (c *BigQueryConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) ([]string, error) {
rawTableName := c.getRawTableName(flowJobName)

// Prepare the query to retrieve distinct tables in that batch
query := fmt.Sprintf(`SELECT DISTINCT _peerdb_destination_table_name FROM %s
WHERE _peerdb_batch_id > %d and _peerdb_batch_id <= %d`,
rawTableName, normalizeBatchID, syncBatchID)
WHERE _peerdb_batch_id = %d`,
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultProjectID = c.projectID
Expand Down Expand Up @@ -379,8 +378,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch(
func (c *BigQueryConnector) getTableNametoUnchangedCols(
ctx context.Context,
flowJobName string,
syncBatchID int64,
normalizeBatchID int64,
batchId int64,
) (map[string][]string, error) {
rawTableName := c.getRawTableName(flowJobName)

Expand All @@ -390,9 +388,9 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(
// we don't want these particular DeleteRecords to be used in the update statement
query := fmt.Sprintf(`SELECT _peerdb_destination_table_name,
array_agg(DISTINCT _peerdb_unchanged_toast_columns) as unchanged_toast_columns FROM %s
WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d AND _peerdb_record_type != 2
WHERE _peerdb_batch_id = %d AND _peerdb_record_type != 2
GROUP BY _peerdb_destination_table_name`,
rawTableName, normalizeBatchID, syncBatchID)
rawTableName, batchId)
// Run the query
q := c.client.Query(query)
q.DefaultDatasetID = c.datasetID
Expand Down Expand Up @@ -468,7 +466,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return res, nil
}

// NormalizeRecords normalizes raw table to destination table.
// NormalizeRecords normalizes raw table to destination table,
// one batch at a time from the previous normalized batch to the currently synced batch.
func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

Expand All @@ -486,31 +485,58 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
}, nil
}

distinctTableNames, err := c.getDistinctTableNamesInBatch(
for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ {
mergeErr := c.mergeTablesInThisBatch(ctx, batchId,
req.FlowJobName, rawTableName, req.TableNameSchemaMapping,
&protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
})
if mergeErr != nil {
return nil, err
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchId)
if err != nil {
return nil, err
}
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *BigQueryConnector) mergeTablesInThisBatch(
ctx context.Context,
batchId int64,
flowName string,
rawTableName string,
tableToSchema map[string]*protos.TableSchema,
peerdbColumns *protos.PeerDBColumns,
) error {
tableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
flowName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
req.FlowJobName,
req.SyncBatchID,
normBatchID,
flowName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
return fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
for _, tableName := range tableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
Expand All @@ -521,15 +547,10 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
normalizedTableSchema: tableToSchema[tableName],
mergeBatchId: batchId,
peerdbCols: peerdbColumns,
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
Expand All @@ -553,20 +574,14 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
return nil
})
if err != nil {
return nil, err
return err
}
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merged raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, tableNames))
return nil
}

// CreateRawTable creates a raw table, implementing the Connector interface.
Expand Down
11 changes: 4 additions & 7 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ type mergeStmtGenerator struct {
dstTableName string
// dataset + destination table
dstDatasetTable datasetTable
// last synced batchID.
syncBatchID int64
// last normalized batchID.
normalizeBatchID int64
// batch id currently to be merged
mergeBatchId int64
// the schema of the table to merge into
normalizedTableSchema *protos.TableSchema
// _PEERDB_IS_DELETED and _SYNCED_AT columns
Expand Down Expand Up @@ -94,10 +92,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {

// normalize anything between last normalized batch id to last sync batchid
return fmt.Sprintf("WITH _f AS "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d AND "+
"(SELECT %s FROM `%s` WHERE _peerdb_batch_id=%d AND "+
"_peerdb_destination_table_name='%s')",
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.normalizeBatchID,
m.syncBatchID, m.dstTableName)
strings.Join(flattenedProjs, ","), m.rawDatasetTable.string(), m.mergeBatchId, m.dstTableName)
}

// This function is to support datatypes like JSON which cannot be partitioned by or compared by BigQuery
Expand Down

0 comments on commit fead7b2

Please sign in to comment.