Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 9, 2024
1 parent f71e132 commit 5ce2781
Showing 1 changed file with 85 additions and 66 deletions.
151 changes: 85 additions & 66 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return res, nil
}

// NormalizeRecords normalizes raw table to destination table.
// NormalizeRecords normalizes raw table to destination table,
// one batch at a time from the previous normalized batch to the currently synced batch.
func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

Expand All @@ -485,86 +486,104 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
}

for batchId := normBatchID + 1; batchId <= req.SyncBatchID; batchId++ {
distinctTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
req.FlowJobName,
batchId,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
batchIdForThisMerge: batchId,
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
return nil
mergeErr := c.MergeTablesInThisBatch(ctx, batchId,
req.FlowJobName, rawTableName, req.TableNameSchemaMapping,
&protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
})
if err != nil {
return nil, err
}
if mergeErr != nil {
return nil, err
}

err = c.pgMetadata.UpdateNormalizeBatchID(ctx, req.FlowJobName, req.SyncBatchID)
if err != nil {
return nil, err
}
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: normBatchID + 1,
EndBatchID: req.SyncBatchID,
}, nil
}

func (c *BigQueryConnector) MergeTablesInThisBatch(
ctx context.Context,
batchId int64,
flowName string,
rawTableName string,
tableToSchema map[string]*protos.TableSchema,
peerdbColumns *protos.PeerDBColumns,
) error {
tableNames, err := c.getDistinctTableNamesInBatch(
ctx,
flowName,
batchId,
)
if err != nil {
return fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
ctx,
flowName,
batchId,
)
if err != nil {
return fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

for _, tableName := range tableNames {
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: tableToSchema[tableName],
batchIdForThisMerge: batchId,
peerdbCols: peerdbColumns,
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
// TODO (kaushik): This is so that the statement size for individual merge statements
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
stmtNum, tableName))

q := c.client.Query(mergeStmt)
q.DefaultProjectID = c.projectID
q.DefaultDatasetID = dstDatasetTable.dataset
_, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
return nil
})
if err != nil {
return err
}
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merged raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, tableNames))
return nil
}

// CreateRawTable creates a raw table, implementing the Connector interface.
// create a table with the following schema
// _peerdb_uid STRING
Expand Down

0 comments on commit 5ce2781

Please sign in to comment.