Skip to content

Commit

Permalink
another refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 14, 2024
1 parent 872072b commit a6ac593
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
8 changes: 6 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,12 +629,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
if err != nil {
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
normalizeTableSchema := req.TableNameSchemaMapping[tableName]
if len(normalizeTableSchema.PrimaryKeyColumns) == 0 {
c.logger.Info(fmt.Sprintf("skipping merge for table %s as it has no primary key", tableName))
continue
}
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
Expand All @@ -645,7 +649,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
normalizedTableSchema: normalizeTableSchema,
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
peerdbCols: &protos.PeerDBColumns{
Expand Down
16 changes: 3 additions & 13 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package connbigquery
import (
"errors"
"fmt"
"log/slog"
"strings"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -126,7 +125,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) (string, error) {
func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) string {
// comma separated list of column names
columnCount := len(m.normalizedTableSchema.ColumnNames)
backtickColNames := make([]string, 0, columnCount)
Expand Down Expand Up @@ -162,9 +161,6 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) (
}
// t.<pkey1> = d.<pkey1> AND t.<pkey2> = d.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")
if pkeySelectSQL == "" {
return "", ErrNoPrimaryKey
}

deletePart := "DELETE"
if m.peerdbCols.SoftDelete {
Expand All @@ -181,7 +177,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) (
"INSERT (%s) VALUES(%s) "+
"%s WHEN MATCHED AND _d._rt=2 THEN %s;",
m.dstDatasetTable.table, m.generateFlattenedCTE(), m.generateDeDupedCTE(),
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart), nil
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string) []string {
Expand All @@ -192,13 +188,7 @@ func (m *mergeStmtGenerator) generateMergeStmts(allUnchangedToastColas []string)

mergeStmts := make([]string, 0, len(partitions))
for _, partition := range partitions {
mergeStmt, err := m.generateMergeStmt(partition)
if err == ErrNoPrimaryKey {
slog.Warn("No primary key found for a table. Skipping it.",
slog.String("table", m.dstTableName))
continue
}
mergeStmts = append(mergeStmts, mergeStmt)
mergeStmts = append(mergeStmts, m.generateMergeStmt(partition))
}

return mergeStmts
Expand Down

0 comments on commit a6ac593

Please sign in to comment.