Skip to content

Commit

Permalink
making BQ merge a single statement without temp tables (#889)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 24, 2023
1 parent 7e1813a commit b160753
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 41 deletions.
14 changes: 5 additions & 9 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *BigQueryConnector) WaitForTableReady(tblName string) error {
func (c *BigQueryConnector) waitForTableReady(tblName string) error {
table := c.client.Dataset(c.datasetID).Table(tblName)
maxDuration := 5 * time.Minute
deadline := time.Now().Add(maxDuration)
sleepInterval := 15 * time.Second
sleepInterval := 5 * time.Second
attempt := 0

for {
Expand Down Expand Up @@ -816,20 +816,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
stmts = append(stmts, mergeStmts...)
mergeStmt := mergeGen.generateMergeStmt()
stmts = append(stmts, mergeStmt)
}
// update metadata to make the last normalized batch id to the recent last sync batch id.
updateMetadataStmt := fmt.Sprintf(
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';",
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

// put this within a transaction
// TODO - not truncating rows in staging table as of now.
// err = c.truncateTable(staging...)

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err)
Expand Down
41 changes: 11 additions & 30 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type mergeStmtGenerator struct {
Expand All @@ -30,24 +29,6 @@ type mergeStmtGenerator struct {
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
func (m *mergeStmtGenerator) generateMergeStmts() []string {
// return an empty array for now
flattenedCTE := m.generateFlattenedCTE()
deDupedCTE := m.generateDeDupedCTE()
tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5))
// create temp table stmt
createTempTableStmt := fmt.Sprintf(
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

return []string{createTempTableStmt, mergeStmt, dropTempTableStmt}
}

// generateFlattenedCTE generates a flattened CTE.
func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
Expand Down Expand Up @@ -129,7 +110,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
func (m *mergeStmtGenerator) generateMergeStmt() string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -138,13 +119,13 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
m.UnchangedToastColumns, m.peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
Expand All @@ -162,25 +143,25 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
if m.peerdbCols.SoftDelete {
colName := m.peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
if m.peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
deletePart, m.peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
MERGE %s.%s _peerdb_target USING (%s,%s) _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
`, m.Dataset, m.NormalizedTable, m.generateFlattenedCTE(), m.generateDeDupedCTE(),
pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart)
}

/*
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (s *QRepAvroSyncMethod) writeToStage(
slog.String("batchOrPartitionID", syncID),
)
if s.gcsBucket != "" {

bucket := s.connector.storageClient.Bucket(s.gcsBucket)
avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID)
obj := bucket.Object(avroFilePath)
Expand Down Expand Up @@ -415,7 +414,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
}
slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID))

err = s.connector.WaitForTableReady(stagingTable)
err = s.connector.waitForTableReady(stagingTable)
if err != nil {
return 0, fmt.Errorf("failed to wait for table to be ready: %w", err)
}
Expand Down

0 comments on commit b160753

Please sign in to comment.