Skip to content

Commit

Permalink
Normalize until one more (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jan 6, 2024
1 parent 19bdb49 commit 0e50e32
Showing 1 changed file with 69 additions and 59 deletions.
128 changes: 69 additions & 59 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,76 +564,86 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
EndBatchID: batchIDs.SyncBatchID,
}, nil
}
distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
req.FlowJobName,
batchIDs.SyncBatchID,
batchIDs.NormalizeBatchID,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: &datasetTable{
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
prevNormalizedUntil := batchIDs.NormalizeBatchID
normalizeUntil := batchIDs.NormalizeBatchID
for normalizeUntil < batchIDs.SyncBatchID {
normalizeUntil += 1
distinctTableNames, err := c.getDistinctTableNamesInBatch(
req.FlowJobName,
normalizeUntil,
prevNormalizedUntil,
)
if err != nil {
return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
req.FlowJobName,
normalizeUntil,
prevNormalizedUntil,
)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// run the merge statement
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: &datasetTable{
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
syncBatchID: normalizeUntil,
normalizeBatchID: prevNormalizedUntil,
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
shortColumn: map[string]string{},
}

// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()

// run the merge statement
for i, mergeStmt := range mergeStmts {
c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
i+1, len(mergeStmts), tableName))
q := c.client.Query(mergeStmt)
_, err = q.Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
}
}
}
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to update metadata table: %v", err)

// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, normalizeUntil, req.FlowJobName)
_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to update metadata table: %v", err)
}

prevNormalizedUntil = normalizeUntil
c.logger.Info(fmt.Sprintf("normalized batch %d / %d", normalizeUntil, batchIDs.SyncBatchID))
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: batchIDs.NormalizeBatchID + 1,
EndBatchID: batchIDs.SyncBatchID,
EndBatchID: normalizeUntil,
}, nil
}

Expand Down

0 comments on commit 0e50e32

Please sign in to comment.